On 4/28/22 05:17, Amit Kapila wrote:
> On Thu, Apr 28, 2022 at 3:26 AM Tomas Vondra
> <tomas.von...@enterprisedb.com> wrote:
>>
>> so I've been looking at tweaking the code so that the behavior matches
>> Alvaro's expectations. It passes check-world but I'm not claiming it's
>> nowhere near commitable - the purpose is mostly to give better idea of
>> how invasive the change is etc.
>>
> 
> I was just skimming through the patch and didn't find anything related
> to initial sync handling. I feel the behavior should be same for
> initial sync and replication.
> 

Yeah, sorry for not mentioning that - my goal was to explore and try
getting the behavior in regular replication right first, before
attempting to do the same thing in tablesync.

Attached is a patch doing the same thing in tablesync. The overall idea
is to generate copy statement with CASE expressions, applying filters to
individual columns. For Alvaro's example, this generates something like

  SELECT
    (CASE WHEN (a < 0) OR (a > 0) THEN a ELSE NULL END) AS a,
    (CASE WHEN (a > 0) THEN b ELSE NULL END) AS b,
    (CASE WHEN (a < 0) THEN c ELSE NULL END) AS c
  FROM uno WHERE (a < 0) OR (a > 0)

And that seems to work fine. Similarly to regular replication we have to
use both the "total" column list (union of per-publication lists) and
per-publication (row filter + column list), but that's expected.

There's a couple options how we might optimize this for common cases.
For example if there's just a single publication, there's no need to
generate the CASE expressions - the WHERE filter will do the trick.



regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e2d29..41c5e3413f6 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -33,7 +33,9 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel,
 								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary,
+								   Bitmapset *schema_columns,
+								   Bitmapset *columns);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary,
+						Bitmapset *schema_columns, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *schema_columns,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+		logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -551,7 +557,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+	logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 }
 
 /*
@@ -766,7 +772,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary,
+					   Bitmapset *schema_columns, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -783,7 +790,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
 		nliveatts++;
@@ -804,10 +811,23 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		/*
+		 * Columns that are not in schema (union of column lists) should
+		 * be skipped entirely.
+		 */
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
-		if (isnull[i])
+		/*
+		 * Columns not in the column list (derived consindering row filters)
+		 * we just send NULL.
+		 *
+		 * XXX Not sure this is quite correct, though. Imagine you replicate
+		 * values for columns (A,B), but it changes the row filter. Can we
+		 * send NULL that would overwrite "proper" value replicated earlier?
+		 */
+		if (isnull[i] ||
+			!column_in_column_list(att->attnum, columns))
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
 			continue;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 49ceec3bdc8..fd547e16f4a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -126,6 +126,20 @@ static bool FetchTableStates(bool *started_tx);
 
 StringInfo	copybuf = NULL;
 
+/*
+ * Info use to track and evaluate row filters for each publication the relation
+ * is included in, and calculat ethe column list.
+ */
+typedef struct PublicationInfo {
+
+	/* row filter (expression state) */
+	Node	   *rowfilter;
+
+	/* column list */
+	Bitmapset  *columns;
+
+} PublicationInfo;
+
 /*
  * Exit routine for synchronization worker.
  */
@@ -696,14 +710,14 @@ copy_read_data(void *outbuf, int minread, int maxread)
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel, List **qual)
+						LogicalRepRelation *lrel, List **pubinfos)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
-	Oid			qualRow[] = {TEXTOID};
+	Oid			qualRow[] = {TEXTOID, INT2VECTOROID};
 	bool		isnull;
 	int			natt;
 	ListCell   *lc;
@@ -878,6 +892,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	/* We don't know the number of rows coming, so allocate enough space. */
 	lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *));
 	lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid));
+	lrel->attnums = palloc0(MaxTupleAttributeNumber * sizeof(int16));
 	lrel->attkeys = NULL;
 
 	/*
@@ -905,6 +920,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		Assert(!isnull);
 
 		lrel->attnames[natt] = rel_colname;
+		lrel->attnums[natt] = attnum;
 		lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull));
 		Assert(!isnull);
 
@@ -943,6 +959,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 	 * 3) one of the subscribed publications is declared as ALL TABLES IN
 	 * SCHEMA that includes this relation
 	 */
+	*pubinfos = NIL;
 	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
 	{
 		StringInfoData pub_names;
@@ -965,7 +982,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 		/* Check for row filters. */
 		resetStringInfo(&cmd);
 		appendStringInfo(&cmd,
-						 "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid)"
+						 "SELECT DISTINCT pg_get_expr(pr.prqual, pr.prrelid), pr.prattrs"
 						 "  FROM pg_publication p"
 						 "  LEFT OUTER JOIN pg_publication_rel pr"
 						 "       ON (p.oid = pr.prpubid AND pr.prrelid = %u),"
@@ -976,7 +993,7 @@ fetch_remote_table_info(char *nspname, char *relname,
 						 lrel->remoteid,
 						 pub_names.data);
 
-		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, lengthof(qualRow), qualRow);
 
 		if (res->status != WALRCV_OK_TUPLES)
 			ereport(ERROR,
@@ -993,21 +1010,31 @@ fetch_remote_table_info(char *nspname, char *relname,
 		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
 		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
 		{
+			PublicationInfo *pubinfo;
 			Datum		rf = slot_getattr(slot, 1, &isnull);
+			Datum		cl;
+
+			pubinfo = (PublicationInfo	*) palloc0(sizeof(PublicationInfo));
+
+			if (!isnull)
+				pubinfo->rowfilter = (Node *) makeString(TextDatumGetCString(rf));
+
+			cl = slot_getattr(slot, 2, &isnull);
 
 			if (!isnull)
-				*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
-			else
 			{
-				/* Ignore filters and cleanup as necessary. */
-				if (*qual)
+				int i;
+				int2vector *prattrs = (int2vector *) cl;
+
+				for (i = 0; i < prattrs->dim1; i++)
 				{
-					list_free_deep(*qual);
-					*qual = NIL;
+					pubinfo->columns = bms_add_member(pubinfo->columns,
+													  prattrs->values[i]);
 				}
-				break;
 			}
 
+			*pubinfos = lappend(*pubinfos, pubinfo);
+
 			ExecClearTuple(slot);
 		}
 		ExecDropSingleTupleTableSlot(slot);
@@ -1028,7 +1055,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
-	List	   *qual = NIL;
+	List	   *pubinfos = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -1037,7 +1064,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel, &qual);
+							RelationGetRelationName(rel), &lrel, &pubinfos);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -1050,7 +1077,9 @@ copy_table(Relation rel)
 	initStringInfo(&cmd);
 
 	/* Regular table with no row filter */
-	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+	/* FIXME pubinfos is never NULL now, need to detect absence of row filters
+	 * in a different way */
+	if (lrel.relkind == RELKIND_RELATION && pubinfos == NIL)
 	{
 		appendStringInfo(&cmd, "COPY %s (",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
@@ -1076,11 +1105,55 @@ copy_table(Relation rel)
 		 * (SELECT ...), but we can't just do SELECT * because we need to not
 		 * copy generated columns. For tables with any row filters, build a
 		 * SELECT query with OR'ed row filters for COPY.
+		 *
+		 * FIXME can be simplified if all subscriptions have the same column
+		 * list (or no column list), in which case we don't need the CASE
+		 * expressions at all.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
 		{
-			appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			ListCell   *lc;
+			StringInfoData qual;
+			bool		no_filter = false;
+			bool		is_first = true;
+
+			initStringInfo(&qual);
+
+			/* find all row filters for the column, combine them using OR */
+			foreach (lc, pubinfos)
+			{
+				PublicationInfo *pubinfo = (PublicationInfo *) lfirst(lc);
+
+				/* not included in this publication column list */
+				if (pubinfo->columns != NULL &&
+					!bms_is_member(lrel.attnums[i], pubinfo->columns))
+					continue;
+
+				/* covered by this publication, is there an expression? */
+				if (pubinfo->rowfilter == NULL)
+				{
+					no_filter = true;
+					break;
+				}
+
+				if (is_first)
+				{
+					appendStringInfo(&qual, "%s", strVal(pubinfo->rowfilter));
+					is_first = false;
+				}
+				else
+					appendStringInfo(&qual, " OR %s", strVal(pubinfo->rowfilter));
+			}
+
+			if (no_filter)
+				appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+			else
+				appendStringInfo(&cmd, "(CASE WHEN (%s) THEN %s ELSE NULL END) AS %s",
+								 qual.data,
+								 quote_identifier(lrel.attnames[i]),
+								 quote_identifier(lrel.attnames[i]));
+			
 			if (i < lrel.natts - 1)
 				appendStringInfoString(&cmd, ", ");
 		}
@@ -1095,6 +1168,24 @@ copy_table(Relation rel)
 			appendStringInfoString(&cmd, "ONLY ");
 
 		appendStringInfoString(&cmd, quote_qualified_identifier(lrel.nspname, lrel.relname));
+
+		{
+			List *qual = NIL;
+			ListCell *lc;
+
+			foreach (lc, pubinfos)
+			{
+				PublicationInfo *pubinfo = (PublicationInfo *) lfirst(lc);
+
+				if (pubinfo->rowfilter == NULL)
+				{
+					qual = NIL;
+					break;
+				}
+
+				qual = lappend(qual, pubinfo->rowfilter);
+			}
+
 		/* list of OR'ed filters */
 		if (qual != NIL)
 		{
@@ -1110,6 +1201,8 @@ copy_table(Relation rel)
 			list_free_deep(qual);
 		}
 
+		}
+
 		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565d..85456e6d9f5 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -137,13 +137,13 @@ typedef struct RelationSyncEntry
 	PublicationActions pubactions;
 
 	/*
-	 * ExprState array for row filter. Different publication actions don't
-	 * allow multiple expressions to always be combined into one, because
-	 * updates or deletes restrict the column in expression to be part of the
-	 * replica identity index whereas inserts do not have this restriction, so
-	 * there is one ExprState per publication action.
+	 * Info about row filters and column lists for each publication this
+	 * relation is included in. We keep a list with per-publication info in
+	 * order to calculate appropriate column list depending on which row
+	 * filter(s) match. Each element contains an ExprState for the filter
+	 * and column list.
 	 */
-	ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
+	dlist_head	pubinfos;		/* one per publication */
 	EState	   *estate;			/* executor state used for row filter */
 	TupleTableSlot *new_slot;	/* slot for storing new tuple */
 	TupleTableSlot *old_slot;	/* slot for storing old tuple */
@@ -208,6 +208,29 @@ typedef struct PGOutputTxnData
 								 * been sent */
 }		PGOutputTxnData;
 
+/*
+ * Info use to track and evaluate row filters for each publication the relation
+ * is included in, and calculat ethe column list.
+ */
+typedef struct PublicationInfo {
+
+	/* doubly-linked list */
+	dlist_node	node;
+
+	/* publication OID (XXX not really needed) */
+	Oid			oid;
+
+	/* row filter (expression state) */
+	ExprState  *rowfilter;
+
+	/* column list */
+	Bitmapset  *columns;
+
+	/* actions published by the publication */
+	PublicationActions	pubactions;
+
+} PublicationInfo;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -235,12 +258,8 @@ static bool pgoutput_row_filter_exec_expr(ExprState *state,
 static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 								TupleTableSlot **new_slot_ptr,
 								RelationSyncEntry *entry,
-								ReorderBufferChangeType *action);
-
-/* column list routines */
-static void pgoutput_column_list_init(PGOutputData *data,
-									  List *publications,
-									  RelationSyncEntry *entry);
+								ReorderBufferChangeType *action,
+								Bitmapset **column_list);
 
 /*
  * Specify output plugin callbacks
@@ -822,18 +841,24 @@ pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
 }
 
 /*
- * Initialize the row filter.
+ * Initialize the row filter and column list.
+ *
+ * Prepare information (ExprState, etc) used to evaluate per-publication row
+ * filters, and column lists.
+ *
+ * We also calculate a "total" column list as a union of all per-publication
+ * column lists, irrespectedly of row filters. This is used to send schema
+ * for the relsync entry, etc.
  */
 static void
 pgoutput_row_filter_init(PGOutputData *data, List *publications,
 						 RelationSyncEntry *entry)
 {
 	ListCell   *lc;
-	List	   *rfnodes[] = {NIL, NIL, NIL};	/* One per pubaction */
-	bool		no_filter[] = {false, false, false};	/* One per pubaction */
 	MemoryContext oldctx;
-	int			idx;
-	bool		has_filter = true;
+	bool		all_columns = false;
+
+	dlist_init(&entry->pubinfos);
 
 	/*
 	 * Find if there are any row filters for this relation. If there are, then
@@ -855,7 +880,12 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 		Publication *pub = lfirst(lc);
 		HeapTuple	rftuple = NULL;
 		Datum		rfdatum = 0;
+		Datum		cfdatum = 0;
 		bool		pub_no_filter = false;
+		bool		pub_no_list = false;
+		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
+
+		PublicationInfo *pubinfo = NULL;
 
 		if (pub->alltables)
 		{
@@ -865,6 +895,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 			 * publications it does).
 			 */
 			pub_no_filter = true;
+			pub_no_list = true;
 		}
 		else
 		{
@@ -881,191 +912,75 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
 										  Anum_pg_publication_rel_prqual,
 										  &pub_no_filter);
+
+				/*
+				 * Lookup the column list attribute.
+				 *
+				 * Null indicates no list.
+				 */
+				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+										  Anum_pg_publication_rel_prattrs,
+										  &pub_no_list);
 			}
 			else
 			{
 				pub_no_filter = true;
+				pub_no_list = true;
 			}
 		}
 
-		if (pub_no_filter)
-		{
-			if (rftuple)
-				ReleaseSysCache(rftuple);
-
-			no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
-			no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
-			no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
-
-			/*
-			 * Quick exit if all the DML actions are publicized via this
-			 * publication.
-			 */
-			if (no_filter[PUBACTION_INSERT] &&
-				no_filter[PUBACTION_UPDATE] &&
-				no_filter[PUBACTION_DELETE])
-			{
-				has_filter = false;
-				break;
-			}
-
-			/* No additional work for this publication. Next one. */
-			continue;
-		}
-
-		/* Form the per pubaction row filter lists. */
-		if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
-			rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
-			rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
-			rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
-												TextDatumGetCString(rfdatum));
-
-		ReleaseSysCache(rftuple);
-	}							/* loop all subscribed publications */
-
-	/* Clean the row filter */
-	for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-	{
-		if (no_filter[idx])
-		{
-			list_free_deep(rfnodes[idx]);
-			rfnodes[idx] = NIL;
-		}
-	}
-
-	if (has_filter)
-	{
-		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
-
 		pgoutput_ensure_entry_cxt(data, entry);
 
-		/*
-		 * Now all the filters for all pubactions are known. Combine them when
-		 * their pubactions are the same.
-		 */
 		oldctx = MemoryContextSwitchTo(entry->entry_cxt);
-		entry->estate = create_estate_for_relation(relation);
-		for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-		{
-			List	   *filters = NIL;
-			Expr	   *rfnode;
 
-			if (rfnodes[idx] == NIL)
-				continue;
-
-			foreach(lc, rfnodes[idx])
-				filters = lappend(filters, stringToNode((char *) lfirst(lc)));
-
-			/* combine the row filter and cache the ExprState */
-			rfnode = make_orclause(filters);
-			entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
-		}						/* for each pubaction */
-		MemoryContextSwitchTo(oldctx);
-
-		RelationClose(relation);
-	}
-}
+		pubinfo = (PublicationInfo *) palloc0(sizeof(PublicationInfo));
 
-/*
- * Initialize the column list.
- */
-static void
-pgoutput_column_list_init(PGOutputData *data, List *publications,
-						  RelationSyncEntry *entry)
-{
-	ListCell   *lc;
+		pubinfo->oid = pub->oid;
+		pubinfo->pubactions = pub->pubactions;
 
-	/*
-	 * Find if there are any column lists for this relation. If there are,
-	 * build a bitmap merging all the column lists.
-	 *
-	 * All the given publication-table mappings must be checked.
-	 *
-	 * Multiple publications might have multiple column lists for this relation.
-	 *
-	 * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
-	 * list" so it takes precedence.
-	 */
-	foreach(lc, publications)
-	{
-		Publication *pub = lfirst(lc);
-		HeapTuple	cftuple = NULL;
-		Datum		cfdatum = 0;
+		if (!pub_no_filter)
+		{
+			entry->estate = create_estate_for_relation(relation);
 
-		/*
-		 * Assume there's no column list. Only if we find pg_publication_rel
-		 * entry with a column list we'll switch it to false.
-		 */
-		bool		pub_no_list = true;
+			pubinfo->rowfilter
+				= ExecPrepareExpr(stringToNode(TextDatumGetCString(rfdatum)),
+								  entry->estate);
+		}
 
 		/*
-		 * If the publication is FOR ALL TABLES then it is treated the same as if
-		 * there are no column lists (even if other publications have a list).
+		 * Build the column list bitmap in the per-entry context.
+		 *
+		 * We need to merge column lists from all publications, so we
+		 * update the same bitmapset. If the column list is null, we
+		 * interpret it as replicating all columns.
 		 */
-		if (!pub->alltables)
+		pubinfo->columns = NULL;
+		if (!pub_no_list)	/* when not null */
 		{
-			/*
-			 * Check for the presence of a column list in this publication.
-			 *
-			 * Note: If we find no pg_publication_rel row, it's a publication
-			 * defined for a whole schema, so it can't have a column list, just
-			 * like a FOR ALL TABLES publication.
-			 */
-			cftuple = SearchSysCache2(PUBLICATIONRELMAP,
-									  ObjectIdGetDatum(entry->publish_as_relid),
-									  ObjectIdGetDatum(pub->oid));
-
-			if (HeapTupleIsValid(cftuple))
-			{
-				/*
-				 * Lookup the column list attribute.
-				 *
-				 * Note: We update the pub_no_list value directly, because if
-				 * the value is NULL, we have no list (and vice versa).
-				 */
-				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
-										  Anum_pg_publication_rel_prattrs,
-										  &pub_no_list);
-
-				/*
-				 * Build the column list bitmap in the per-entry context.
-				 *
-				 * We need to merge column lists from all publications, so we
-				 * update the same bitmapset. If the column list is null, we
-				 * interpret it as replicating all columns.
-				 */
-				if (!pub_no_list)	/* when not null */
-				{
-					pgoutput_ensure_entry_cxt(data, entry);
+			pubinfo->columns = pub_collist_to_bitmapset(pubinfo->columns,
+														cfdatum,
+														entry->entry_cxt);
 
-					entry->columns = pub_collist_to_bitmapset(entry->columns,
-															  cfdatum,
-															  entry->entry_cxt);
-				}
-			}
+			entry->columns = pub_collist_to_bitmapset(entry->columns,
+													  cfdatum,
+													  entry->entry_cxt);
 		}
+		else
+			all_columns = true;
 
-		/*
-		 * Found a publication with no column list, so we're done. But first
-		 * discard column list we might have from preceding publications.
-		 */
-		if (pub_no_list)
-		{
-			if (cftuple)
-				ReleaseSysCache(cftuple);
+		if (HeapTupleIsValid(rftuple))
+			ReleaseSysCache(rftuple);
 
-			bms_free(entry->columns);
-			entry->columns = NULL;
+		MemoryContextSwitchTo(oldctx);
+		RelationClose(relation);
 
-			break;
-		}
+		dlist_push_tail(&entry->pubinfos, &pubinfo->node);
 
-		ReleaseSysCache(cftuple);
 	}							/* loop all subscribed publications */
+
+	/* any of the publications replicates all columns */
+	if (all_columns)
+		entry->columns = NULL;
 }
 
 /*
@@ -1115,7 +1030,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 }
 
 /*
- * Change is checked against the row filter if any.
+ * Change is checked against the row filter if any, and calculate the column
+ * list applicable to the operation (with respect to matching row filters).
  *
  * Returns true if the change is to be replicated, else false.
  *
@@ -1136,6 +1052,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
  *
  * The new action is updated in the action parameter.
  *
+ * The calculated column list is returned in the column_list parameter.
+ *
  * The new slot could be updated when transforming the UPDATE into INSERT,
  * because the original new tuple might not have column values from the replica
  * identity.
@@ -1167,17 +1085,21 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 static bool
 pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 					TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
-					ReorderBufferChangeType *action)
+					ReorderBufferChangeType *action, Bitmapset **column_list)
 {
 	TupleDesc	desc;
 	int			i;
-	bool		old_matched,
-				new_matched,
+	bool		old_matched_any = false,
+				new_matched_any = false,
 				result;
-	TupleTableSlot *tmp_new_slot;
+	TupleTableSlot *tmp_new_slot = NULL;
 	TupleTableSlot *new_slot = *new_slot_ptr;
-	ExprContext *ecxt;
-	ExprState  *filter_exprstate;
+	dlist_iter	iter;
+	bool		matching = false;
+
+	/* Column list calculated from publications matching the row filter. */
+	Bitmapset  *columns = NULL;
+	bool		all_columns = false;
 
 	/*
 	 * We need this map to avoid relying on ReorderBufferChangeType enums
@@ -1195,115 +1117,191 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 
 	Assert(new_slot || old_slot);
 
-	/* Get the corresponding row filter */
-	filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+	dlist_foreach(iter, &entry->pubinfos)
+	{
+		ExprContext *ecxt;
+		bool		old_matched,
+					new_matched;
 
-	/* Bail out if there is no row filter */
-	if (!filter_exprstate)
-		return true;
+		PublicationInfo  *pubinfo
+			= dlist_container(PublicationInfo, node, iter.cur);
 
-	elog(DEBUG3, "table \"%s.%s\" has row filter",
-		 get_namespace_name(RelationGetNamespace(relation)),
-		 RelationGetRelationName(relation));
+		/* ignore publications not replicating this action */
+		if ((*action == REORDER_BUFFER_CHANGE_INSERT) &&
+			(!pubinfo->pubactions.pubinsert))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_UPDATE) &&
+				 (!pubinfo->pubactions.pubupdate))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_DELETE) &&
+			(!pubinfo->pubactions.pubdelete))
+			continue;
 
-	ResetPerTupleExprContext(entry->estate);
+		if (!pubinfo->rowfilter)
+		{
+			matching = true;
 
-	ecxt = GetPerTupleExprContext(entry->estate);
+			/*
+			 * Update/merge the column list.
+			 *
+			 * If the publication has no column list, we interpret it as a list
+			 * with all columns. Otherwise we just add it to the bitmap.
+			 *
+			 * FIXME This is repeated in three places. Maybe refactor?
+			 */
+			if (!pubinfo->columns)
+			{
+				all_columns = true;
+				bms_free(columns);
+				columns = NULL;
+			}
+			else if (!all_columns)
+				columns = bms_union(columns, pubinfo->columns);
 
-	/*
-	 * For the following occasions where there is only one tuple, we can
-	 * evaluate the row filter for that tuple and return.
-	 *
-	 * For inserts, we only have the new tuple.
-	 *
-	 * For updates, we can have only a new tuple when none of the replica
-	 * identity columns changed and none of those columns have external data
-	 * but we still need to evaluate the row filter for the new tuple as the
-	 * existing values of those columns might not match the filter. Also, users
-	 * can use constant expressions in the row filter, so we anyway need to
-	 * evaluate it for the new tuple.
-	 *
-	 * For deletes, we only have the old tuple.
-	 */
-	if (!new_slot || !old_slot)
-	{
-		ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
-		result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+			continue;
+		}
 
-		return result;
-	}
+		elog(DEBUG3, "table \"%s.%s\" has row filter",
+			get_namespace_name(RelationGetNamespace(relation)),
+			RelationGetRelationName(relation));
 
-	/*
-	 * Both the old and new tuples must be valid only for updates and need to
-	 * be checked against the row filter.
-	 */
-	Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+		ResetPerTupleExprContext(entry->estate);
+
+		ecxt = GetPerTupleExprContext(entry->estate);
 
-	slot_getallattrs(new_slot);
-	slot_getallattrs(old_slot);
+		/*
+		 * For the following occasions where there is only one tuple, we can
+		 * evaluate the row filter for that tuple and return.
+		 *
+		 * For inserts, we only have the new tuple.
+		 *
+		 * For updates, we can have only a new tuple when none of the replica
+		 * identity columns changed and none of those columns have external data
+		 * but we still need to evaluate the row filter for the new tuple as the
+		 * existing values of those columns might not match the filter. Also, users
+		 * can use constant expressions in the row filter, so we anyway need to
+		 * evaluate it for the new tuple.
+		 *
+		 * For deletes, we only have the old tuple.
+		 */
+		if (!new_slot || !old_slot)
+		{
+			ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+			result = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
 
-	tmp_new_slot = NULL;
-	desc = RelationGetDescr(relation);
+			matching |= result;
 
-	/*
-	 * The new tuple might not have all the replica identity columns, in which
-	 * case it needs to be copied over from the old tuple.
-	 */
-	for (i = 0; i < desc->natts; i++)
-	{
-		Form_pg_attribute att = TupleDescAttr(desc, i);
+			/*
+			 * FIXME refactor to reuse this code in multiple places
+			 * 
+			 * XXX Should this only update column list when using new slot?
+			 * If evaluationg old slot, that's delete, no?
+			 */
+			if (result)
+			{
+				if (!pubinfo->columns)
+				{
+					all_columns = true;
+					bms_free(columns);
+					columns = NULL;
+				}
+				else if (!all_columns)
+				{
+					columns = bms_union(columns, pubinfo->columns);
+				}
+			}
+
+			continue;
+		}
 
 		/*
-		 * if the column in the new tuple or old tuple is null, nothing to do
+		 * Both the old and new tuples must be valid only for updates and need to
+		 * be checked against the row filter.
 		 */
-		if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
-			continue;
+		Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+		slot_getallattrs(new_slot);
+		slot_getallattrs(old_slot);
+
+		tmp_new_slot = NULL;
+		desc = RelationGetDescr(relation);
 
 		/*
-		 * Unchanged toasted replica identity columns are only logged in the
-		 * old tuple. Copy this over to the new tuple. The changed (or WAL
-		 * Logged) toast values are always assembled in memory and set as
-		 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+		 * The new tuple might not have all the replica identity columns, in which
+		 * case it needs to be copied over from the old tuple.
 		 */
-		if (att->attlen == -1 &&
-			VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
-			!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+		for (i = 0; i < desc->natts; i++)
 		{
-			if (!tmp_new_slot)
+			Form_pg_attribute att = TupleDescAttr(desc, i);
+
+			/*
+			 * if the column in the new tuple or old tuple is null, nothing to do
+			 */
+			if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+				continue;
+
+			/*
+			 * Unchanged toasted replica identity columns are only logged in the
+			 * old tuple. Copy this over to the new tuple. The changed (or WAL
+			 * Logged) toast values are always assembled in memory and set as
+			 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+			 */
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+				!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
 			{
-				tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
-				ExecClearTuple(tmp_new_slot);
+				if (!tmp_new_slot)
+				{
+					tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+					ExecClearTuple(tmp_new_slot);
 
-				memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
-					   desc->natts * sizeof(Datum));
-				memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
-					   desc->natts * sizeof(bool));
-			}
+					memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+						   desc->natts * sizeof(Datum));
+					memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+						   desc->natts * sizeof(bool));
+				}
 
-			tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
-			tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+				tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+				tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+			}
 		}
-	}
 
-	ecxt->ecxt_scantuple = old_slot;
-	old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		ecxt->ecxt_scantuple = old_slot;
 
-	if (tmp_new_slot)
-	{
-		ExecStoreVirtualTuple(tmp_new_slot);
-		ecxt->ecxt_scantuple = tmp_new_slot;
-	}
-	else
-		ecxt->ecxt_scantuple = new_slot;
+		old_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		old_matched_any |= old_matched;
+
+		if (tmp_new_slot)
+		{
+			ExecStoreVirtualTuple(tmp_new_slot);
+			ecxt->ecxt_scantuple = tmp_new_slot;
+		}
+		else
+			ecxt->ecxt_scantuple = new_slot;
 
-	new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		new_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		new_matched_any |= new_matched;
 
-	/*
-	 * Case 1: if both tuples don't match the row filter, bailout. Send
-	 * nothing.
-	 */
-	if (!old_matched && !new_matched)
-		return false;
+		/*
+		 * Case 1: if both tuples don't match the row filter, bailout. Send
+		 * nothing.
+		 */
+		if (!old_matched && !new_matched)
+			continue;	/* continue with the next row filter */
+
+		/*
+		 * Case 4: if both tuples match the row filter, transformation isn't
+		 * required. (*action is default UPDATE).
+		 */
+		if (!pubinfo->columns)
+		{
+			all_columns = true;
+			bms_free(columns);
+			columns = NULL;
+		}
+		else if (!all_columns && new_matched)
+			columns = bms_union(columns, pubinfo->columns);
+	}
 
 	/*
 	 * Case 2: if the old tuple doesn't satisfy the row filter but the new
@@ -1314,9 +1312,10 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * while inserting the tuple in the downstream node, we have all the
 	 * required column values.
 	 */
-	if (!old_matched && new_matched)
+	if (!old_matched_any && new_matched_any)
 	{
 		*action = REORDER_BUFFER_CHANGE_INSERT;
+		matching = true;
 
 		if (tmp_new_slot)
 			*new_slot_ptr = tmp_new_slot;
@@ -1329,15 +1328,18 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * This transformation does not require another tuple. The Old tuple will
 	 * be used for DELETE.
 	 */
-	else if (old_matched && !new_matched)
+	else if (old_matched_any && !new_matched_any)
+	{
 		*action = REORDER_BUFFER_CHANGE_DELETE;
+		matching = true;
+	}
+	else if (old_matched_any && new_matched_any)
+		matching = true;
 
-	/*
-	 * Case 4: if both tuples match the row filter, transformation isn't
-	 * required. (*action is default UPDATE).
-	 */
+	if (column_list)
+		*column_list = columns;
 
-	return true;
+	return matching;
 }
 
 /*
@@ -1359,6 +1361,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	Bitmapset	   *columns = NULL;
 
 	if (!is_publishable_relation(relation))
 		return;
@@ -1423,7 +1426,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
+									 &action, &columns))
 				break;
 
 			/*
@@ -1444,7 +1447,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary,
+									relentry->columns, columns);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1483,7 +1487,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
+									 relentry, &action, &columns))
 				break;
 
 			/* Send BEGIN if we haven't yet */
@@ -1503,12 +1507,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INSERT:
 					logicalrep_write_insert(ctx->out, xid, targetrel,
 											new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_UPDATE:
 					logicalrep_write_update(ctx->out, xid, targetrel,
 											old_slot, new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_DELETE:
 					logicalrep_write_delete(ctx->out, xid, targetrel,
@@ -1547,7 +1551,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				/* Check row filter */
 				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
+										 relentry, &action, NULL))
 					break;
 
 				/* Send BEGIN if we haven't yet */
@@ -1977,7 +1981,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 		entry->entry_cxt = NULL;
 		entry->publish_as_relid = InvalidOid;
 		entry->columns = NULL;
@@ -2056,7 +2060,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 		entry->entry_cxt = NULL;
 		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2192,11 +2196,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			/* Initialize the tuple slot and map */
 			init_tuple_slot(data, relation, entry);
 
-			/* Initialize the row filter */
+			/* Initialize the row filter and column list info */
 			pgoutput_row_filter_init(data, rel_publications, entry);
-
-			/* Initialize the column list */
-			pgoutput_column_list_init(data, rel_publications, entry);
 		}
 
 		list_free(pubids);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8ff33..1a3f4f34bf4 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -104,6 +104,7 @@ typedef struct LogicalRepRelation
 	char	   *relname;		/* relation name */
 	int			natts;			/* number of columns */
 	char	  **attnames;		/* column names */
+	int16	   *attnums;		/* column attnums */
 	Oid		   *atttyps;		/* column types */
 	char		replident;		/* replica identity */
 	char		relkind;		/* remote relation kind */
@@ -209,12 +210,14 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *schema_columns,
+									Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *schema_columns, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);

Reply via email to