From f83101836af208771b712d0e485d3d081579c97e Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Mon, 30 Aug 2021 11:26:26 +1000
Subject: [PATCH v26] ExprState cache modifications.

Now the cached row-filter caches (e.g. ExprState list) are invalidated only in
rel_sync_cache_relation_cb function, so it means the ALTER PUBLICATION for one
table should not cause row-filters of other tables to also become invalidated.

Also all code related to caching row-filters has been removed from the
get_rel_sync_entry function and is now done just before they are needed in the
pgoutput_row_filter function.

Changes are based on a suggestions from Amit [1] [2].
[1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CAA4eK1%2Btio46goUKBUfAKFsFVxtgk8nOty%3DTxKoKH-gdLzHD2g%40mail.gmail.com
---
 src/backend/replication/pgoutput/pgoutput.c | 198 +++++++++++++++++++---------
 1 file changed, 136 insertions(+), 62 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ef1ba91..ce5e1c5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -123,7 +123,15 @@ typedef struct RelationSyncEntry
 
 	bool		replicate_valid;
 	PublicationActions pubactions;
-	List	   *exprstate;			/* ExprState for row filter */
+
+	/*
+	 * Row-filter related members:
+	 * The flag 'rowfilter_valid' only means that exprstate_list is correct -
+	 * It doesn't mean that there actual is any row filter present for the
+	 * current relid.
+	 */
+	bool		rowfilter_valid;
+	List	   *exprstate_list;		/* ExprState for row filter(s) */
 	TupleTableSlot	*scantuple;		/* tuple table slot for row filter */
 
 	/*
@@ -161,7 +169,7 @@ static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 static EState *create_estate_for_relation(Relation rel);
 static ExprState *pgoutput_row_filter_init_expr(Node *rfnode);
 static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext);
-static bool pgoutput_row_filter(Relation relation, HeapTuple oldtuple,
+static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple,
 								HeapTuple newtuple, RelationSyncEntry *entry);
 
 /*
@@ -731,20 +739,121 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
  * If it returns true, the change is replicated, otherwise, it is not.
  */
 static bool
-pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry)
+pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry)
 {
 	EState	   *estate;
 	ExprContext *ecxt;
 	ListCell   *lc;
 	bool		result = true;
+	Oid			relid = RelationGetRelid(relation);
+
+	/*
+	 * If the row filter caching is currently flagged "invalid" then it means we
+	 * don't know yet if there is/isn't any row filters for this relation.
+	 *
+	 * This code is usually one-time execution.
+	 */
+	if (!entry->rowfilter_valid)
+	{
+		bool 			am_partition = get_rel_relispartition(relid);
+		MemoryContext	oldctx;
+		TupleDesc		tupdesc = RelationGetDescr(relation);
+
+		/*
+		 * Create a tuple table slot for row filter. TupleDesc must live as
+		 * long as the cache remains. Release the tuple table slot if it
+		 * already exists.
+		 */
+		if (entry->scantuple != NULL)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		tupdesc = CreateTupleDescCopy(tupdesc);
+		entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple);
+		MemoryContextSwitchTo(oldctx);
+
+		/*
+		 * Find if there are any row filters for this relation. If there are,
+		 * then prepare the necessary ExprState(s) and cache then in the
+		 * entry->exprstate_list.
+		 *
+		 * NOTE: All publication-table mappings must be checked.
+		 *
+		 * NOTE: If the relation is a partition and pubviaroot is true, use
+		 * the row filter of the topmost partitioned table instead of the row
+		 * filter of its own partition.
+		 */
+		foreach(lc, data->publications)
+		{
+			Publication *pub = lfirst(lc);
+			HeapTuple	rftuple;
+			Datum		rfdatum;
+			bool		rfisnull;
+			Oid			pub_relid = relid;
+
+			if (pub->pubviaroot && am_partition)
+			{
+				if (pub->alltables)
+					pub_relid = llast_oid(get_partition_ancestors(relid));
+				else
+				{
+					List	   *ancestors = get_partition_ancestors(relid);
+					ListCell   *lc2;
+
+					/*
+					 * Find the "topmost" ancestor that is in this
+					 * publication.
+					 */
+					foreach(lc2, ancestors)
+					{
+						Oid			ancestor = lfirst_oid(lc2);
+
+						if (list_member_oid(GetRelationPublications(ancestor),
+											pub->oid))
+						{
+							pub_relid = ancestor;
+						}
+					}
+				}
+			}
+
+			/*
+			 * Lookup if there is a row-filter, and if so build the ExprState for it.
+			 */
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(pub_relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull);
+
+				if (!rfisnull)
+				{
+					Node	   *rfnode;
+					ExprState	*exprstate;
+
+					oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					rfnode = stringToNode(TextDatumGetCString(rfdatum));
+					exprstate = pgoutput_row_filter_init_expr(rfnode);
+					entry->exprstate_list = lappend(entry->exprstate_list, exprstate);
+					MemoryContextSwitchTo(oldctx);
+				}
+
+				ReleaseSysCache(rftuple);
+			}
+
+		} /* loop all subscribed publications */
+
+		entry->rowfilter_valid = true;
+	}
 
 	/* Bail out if there is no row filter */
-	if (entry->exprstate == NIL)
+	if (entry->exprstate_list == NIL)
 		return true;
 
 	elog(DEBUG3, "table \"%s.%s\" has row filter",
-		 get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
-		 get_rel_name(relation->rd_id));
+		 get_namespace_name(get_rel_namespace(relid)),
+		 get_rel_name(relid));
 
 	PushActiveSnapshot(GetTransactionSnapshot());
 
@@ -761,7 +870,7 @@ pgoutput_row_filter(Relation relation, HeapTuple oldtuple, HeapTuple newtuple, R
 	 * different row filter in these publications, all row filters must be
 	 * matched in order to replicate this change.
 	 */
-	foreach(lc, entry->exprstate)
+	foreach(lc, entry->exprstate_list)
 	{
 		ExprState  *exprstate = (ExprState *) lfirst(lc);
 
@@ -840,7 +949,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, NULL, tuple, relentry))
+				if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry))
 					break;
 
 				/*
@@ -873,7 +982,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, oldtuple, newtuple, relentry))
+				if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry))
 					break;
 
 				maybe_send_schema(ctx, change, relation, relentry);
@@ -907,7 +1016,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
 
 				/* Check row filter. */
-				if (!pgoutput_row_filter(relation, oldtuple, NULL, relentry))
+				if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry))
 					break;
 
 				maybe_send_schema(ctx, change, relation, relentry);
@@ -1318,10 +1427,11 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->replicate_valid = false;
+		entry->rowfilter_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->scantuple = NULL;
-		entry->exprstate = NIL;
+		entry->exprstate_list = NIL;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1333,7 +1443,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		List	   *pubids = GetRelationPublications(relid);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
-		TupleDesc	tupdesc = RelationGetDescr(relation);
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -1347,22 +1456,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			publications_valid = true;
 		}
 
-		/* Release tuple table slot */
-		if (entry->scantuple != NULL)
-		{
-			ExecDropSingleTupleTableSlot(entry->scantuple);
-			entry->scantuple = NULL;
-		}
-
-		/*
-		 * Create a tuple table slot for row filter. TupleDesc must live as
-		 * long as the cache remains.
-		 */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		tupdesc = CreateTupleDescCopy(tupdesc);
-		entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple);
-		MemoryContextSwitchTo(oldctx);
-
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
 		 * relcache considers all publications given relation is in, but here
@@ -1372,9 +1465,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 		{
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
-			HeapTuple	rftuple;
-			Datum		rfdatum;
-			bool		rfisnull;
 
 			if (pub->alltables)
 			{
@@ -1434,33 +1524,6 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			/*
-			 * Cache row filter, if available. All publication-table mappings
-			 * must be checked. If it is a partition and pubviaroot is true,
-			 * use the row filter of the topmost partitioned table instead of
-			 * the row filter of its own partition.
-			 */
-			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(publish_as_relid), ObjectIdGetDatum(pub->oid));
-			if (HeapTupleIsValid(rftuple))
-			{
-				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull);
-
-				if (!rfisnull)
-				{
-					Node	   *rfnode;
-					ExprState  *exprstate;
-
-					oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-					rfnode = stringToNode(TextDatumGetCString(rfdatum));
-
-					/* Prepare for expression execution */
-					exprstate = pgoutput_row_filter_init_expr(rfnode);
-					entry->exprstate = lappend(entry->exprstate, exprstate);
-					MemoryContextSwitchTo(oldctx);
-				}
-
-				ReleaseSysCache(rftuple);
-			}
 		}
 
 		list_free(pubids);
@@ -1567,6 +1630,21 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 			free_conversion_map(entry->map);
 		}
 		entry->map = NULL;
+
+		/*
+		 * Row filter cache cleanups. (Will be rebuilt later if needed).
+		 */
+		entry->rowfilter_valid = false;
+		if (entry->scantuple != NULL)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
+		if (entry->exprstate_list != NIL)
+		{
+			list_free_deep(entry->exprstate_list);
+			entry->exprstate_list = NIL;
+		}
 	}
 }
 
@@ -1607,10 +1685,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubupdate = false;
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
-
-		if (entry->exprstate != NIL)
-			list_free_deep(entry->exprstate);
-		entry->exprstate = NIL;
 	}
 
 	MemoryContextSwitchTo(oldctx);
-- 
1.8.3.1

