Hi,

so I've been looking at tweaking the code so that the behavior matches
Alvaro's expectations. It passes check-world but I'm not claiming it's
nowhere near commitable - the purpose is mostly to give better idea of
how invasive the change is etc.

As described earlier, this abandons the idea of building a single OR
expression from all the row filters (per action), and replaces that with
a list of per-publication info (struct PublicationInfo), combining info
about both row filters and column lists.

This means we can't initialize the row filters and column lists
separately, but at the same time. So pgoutput_row_filter_init was
modified to initialize both, and pgoutput_column_list_init was removed.

With this info, we can calculate column lists only for publications with
matching row filters, which is what the modified pgoutput_row_filter
does (the calculated column list is returned through a parameter).


This however does not remove the 'columns' from RelationSyncEntry
entirely. We still need that "superset" column list when sending schema.

Imagine two publications, one replicating (a,b) and the other (a,c),
maybe depending on row filter. send_relation_and_attrs() needs to send
info about all three attributes (a,b,c), i.e. about any attribute that
might end up being replicated.

We might try to be smarter and send the exact schema needed by the next
operation, i.e. when inserting (a,b) we'd make sure the last schema we
sent was (a,b) and invalidate/resend it otherwise. But that might easily
result in "trashing" where we send the schema and the next operation
invalidates it right away because it needs a different schema.

But there's another reason to do it like this - it seems desirable to
actually reset columns don't match the calculated column list. Using
Alvaro's example, it seems reasonable to expect these two transactions
to produce the same result on the subscriber:

1) insert (a,b) + update to (a,c)

  insert into uno values (1, 2, 3);
  update uno set a = -1 where a = 1;

2) insert (a,c)

  insert into uno values (-1, 2, 3);

But to do this, the update actually needs to send (-1,NULL,3).

So in this case we'll have (a,b,c) column list in RelationSyncEntry, and
only attributes on this list will be sent as part of schema. And DML
actions we'll calculate either (a,b) or (a,c) depending on the row
filter, and missing attributes will be replicated as NULL.


I haven't done any tests how this affect performance, but I have a
couple thoughts regarding that:

a) I kinda doubt the optimizations would really matter in practice,
because how likely is it that one relation is in many publications (in
the same subscription)?

b) Did anyone actually do some benchmarks that I could repeat, to see
how much worse this is?

c) AFAICS we could optimize this in at least some common cases. For
example we could combine the entries with matching row filters, and/or
column filters.


regards

-- 
Tomas Vondra
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index ff8513e2d2..41c5e3413f 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -33,7 +33,9 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel,
 								   Bitmapset *columns);
 static void logicalrep_write_tuple(StringInfo out, Relation rel,
 								   TupleTableSlot *slot,
-								   bool binary, Bitmapset *columns);
+								   bool binary,
+								   Bitmapset *schema_columns,
+								   Bitmapset *columns);
 static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel);
 static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple);
 
@@ -412,7 +414,8 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  */
 void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
-						TupleTableSlot *newslot, bool binary, Bitmapset *columns)
+						TupleTableSlot *newslot, bool binary,
+						Bitmapset *schema_columns, Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
@@ -424,7 +427,8 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	pq_sendint32(out, RelationGetRelid(rel));
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -457,7 +461,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
 void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
-						bool binary, Bitmapset *columns)
+						bool binary, Bitmapset *schema_columns,
+						Bitmapset *columns)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
@@ -478,11 +483,12 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+		logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns);
+	logicalrep_write_tuple(out, rel, newslot, binary,
+						   schema_columns, columns);
 }
 
 /*
@@ -551,7 +557,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, NULL);
+	logicalrep_write_tuple(out, rel, oldslot, binary, NULL, NULL);
 }
 
 /*
@@ -766,7 +772,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp)
  */
 static void
 logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
-					   bool binary, Bitmapset *columns)
+					   bool binary,
+					   Bitmapset *schema_columns, Bitmapset *columns)
 {
 	TupleDesc	desc;
 	Datum	   *values;
@@ -783,7 +790,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
 		nliveatts++;
@@ -804,10 +811,23 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
 		if (att->attisdropped || att->attgenerated)
 			continue;
 
-		if (!column_in_column_list(att->attnum, columns))
+		/*
+		 * Columns that are not in schema (union of column lists) should
+		 * be skipped entirely.
+		 */
+		if (!column_in_column_list(att->attnum, schema_columns))
 			continue;
 
-		if (isnull[i])
+		/*
+		 * Columns not in the column list (derived consindering row filters)
+		 * we just send NULL.
+		 *
+		 * XXX Not sure this is quite correct, though. Imagine you replicate
+		 * values for columns (A,B), but it changes the row filter. Can we
+		 * send NULL that would overwrite "proper" value replicated earlier?
+		 */
+		if (isnull[i] ||
+			!column_in_column_list(att->attnum, columns))
 		{
 			pq_sendbyte(out, LOGICALREP_COLUMN_NULL);
 			continue;
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565..85456e6d9f 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -137,13 +137,13 @@ typedef struct RelationSyncEntry
 	PublicationActions pubactions;
 
 	/*
-	 * ExprState array for row filter. Different publication actions don't
-	 * allow multiple expressions to always be combined into one, because
-	 * updates or deletes restrict the column in expression to be part of the
-	 * replica identity index whereas inserts do not have this restriction, so
-	 * there is one ExprState per publication action.
+	 * Info about row filters and column lists for each publication this
+	 * relation is included in. We keep a list with per-publication info in
+	 * order to calculate appropriate column list depending on which row
+	 * filter(s) match. Each element contains an ExprState for the filter
+	 * and column list.
 	 */
-	ExprState  *exprstate[NUM_ROWFILTER_PUBACTIONS];
+	dlist_head	pubinfos;		/* one per publication */
 	EState	   *estate;			/* executor state used for row filter */
 	TupleTableSlot *new_slot;	/* slot for storing new tuple */
 	TupleTableSlot *old_slot;	/* slot for storing old tuple */
@@ -208,6 +208,29 @@ typedef struct PGOutputTxnData
 								 * been sent */
 }		PGOutputTxnData;
 
+/*
+ * Info use to track and evaluate row filters for each publication the relation
+ * is included in, and calculat ethe column list.
+ */
+typedef struct PublicationInfo {
+
+	/* doubly-linked list */
+	dlist_node	node;
+
+	/* publication OID (XXX not really needed) */
+	Oid			oid;
+
+	/* row filter (expression state) */
+	ExprState  *rowfilter;
+
+	/* column list */
+	Bitmapset  *columns;
+
+	/* actions published by the publication */
+	PublicationActions	pubactions;
+
+} PublicationInfo;
+
 /* Map used to remember which relation schemas we sent. */
 static HTAB *RelationSyncCache = NULL;
 
@@ -235,12 +258,8 @@ static bool pgoutput_row_filter_exec_expr(ExprState *state,
 static bool pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 								TupleTableSlot **new_slot_ptr,
 								RelationSyncEntry *entry,
-								ReorderBufferChangeType *action);
-
-/* column list routines */
-static void pgoutput_column_list_init(PGOutputData *data,
-									  List *publications,
-									  RelationSyncEntry *entry);
+								ReorderBufferChangeType *action,
+								Bitmapset **column_list);
 
 /*
  * Specify output plugin callbacks
@@ -822,18 +841,24 @@ pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry)
 }
 
 /*
- * Initialize the row filter.
+ * Initialize the row filter and column list.
+ *
+ * Prepare information (ExprState, etc) used to evaluate per-publication row
+ * filters, and column lists.
+ *
+ * We also calculate a "total" column list as a union of all per-publication
+ * column lists, irrespectedly of row filters. This is used to send schema
+ * for the relsync entry, etc.
  */
 static void
 pgoutput_row_filter_init(PGOutputData *data, List *publications,
 						 RelationSyncEntry *entry)
 {
 	ListCell   *lc;
-	List	   *rfnodes[] = {NIL, NIL, NIL};	/* One per pubaction */
-	bool		no_filter[] = {false, false, false};	/* One per pubaction */
 	MemoryContext oldctx;
-	int			idx;
-	bool		has_filter = true;
+	bool		all_columns = false;
+
+	dlist_init(&entry->pubinfos);
 
 	/*
 	 * Find if there are any row filters for this relation. If there are, then
@@ -855,7 +880,12 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 		Publication *pub = lfirst(lc);
 		HeapTuple	rftuple = NULL;
 		Datum		rfdatum = 0;
+		Datum		cfdatum = 0;
 		bool		pub_no_filter = false;
+		bool		pub_no_list = false;
+		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
+
+		PublicationInfo *pubinfo = NULL;
 
 		if (pub->alltables)
 		{
@@ -865,6 +895,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 			 * publications it does).
 			 */
 			pub_no_filter = true;
+			pub_no_list = true;
 		}
 		else
 		{
@@ -881,191 +912,75 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications,
 				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
 										  Anum_pg_publication_rel_prqual,
 										  &pub_no_filter);
+
+				/*
+				 * Lookup the column list attribute.
+				 *
+				 * Null indicates no list.
+				 */
+				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
+										  Anum_pg_publication_rel_prattrs,
+										  &pub_no_list);
 			}
 			else
 			{
 				pub_no_filter = true;
+				pub_no_list = true;
 			}
 		}
 
-		if (pub_no_filter)
-		{
-			if (rftuple)
-				ReleaseSysCache(rftuple);
-
-			no_filter[PUBACTION_INSERT] |= pub->pubactions.pubinsert;
-			no_filter[PUBACTION_UPDATE] |= pub->pubactions.pubupdate;
-			no_filter[PUBACTION_DELETE] |= pub->pubactions.pubdelete;
-
-			/*
-			 * Quick exit if all the DML actions are publicized via this
-			 * publication.
-			 */
-			if (no_filter[PUBACTION_INSERT] &&
-				no_filter[PUBACTION_UPDATE] &&
-				no_filter[PUBACTION_DELETE])
-			{
-				has_filter = false;
-				break;
-			}
-
-			/* No additional work for this publication. Next one. */
-			continue;
-		}
-
-		/* Form the per pubaction row filter lists. */
-		if (pub->pubactions.pubinsert && !no_filter[PUBACTION_INSERT])
-			rfnodes[PUBACTION_INSERT] = lappend(rfnodes[PUBACTION_INSERT],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubupdate && !no_filter[PUBACTION_UPDATE])
-			rfnodes[PUBACTION_UPDATE] = lappend(rfnodes[PUBACTION_UPDATE],
-												TextDatumGetCString(rfdatum));
-		if (pub->pubactions.pubdelete && !no_filter[PUBACTION_DELETE])
-			rfnodes[PUBACTION_DELETE] = lappend(rfnodes[PUBACTION_DELETE],
-												TextDatumGetCString(rfdatum));
-
-		ReleaseSysCache(rftuple);
-	}							/* loop all subscribed publications */
-
-	/* Clean the row filter */
-	for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-	{
-		if (no_filter[idx])
-		{
-			list_free_deep(rfnodes[idx]);
-			rfnodes[idx] = NIL;
-		}
-	}
-
-	if (has_filter)
-	{
-		Relation	relation = RelationIdGetRelation(entry->publish_as_relid);
-
 		pgoutput_ensure_entry_cxt(data, entry);
 
-		/*
-		 * Now all the filters for all pubactions are known. Combine them when
-		 * their pubactions are the same.
-		 */
 		oldctx = MemoryContextSwitchTo(entry->entry_cxt);
-		entry->estate = create_estate_for_relation(relation);
-		for (idx = 0; idx < NUM_ROWFILTER_PUBACTIONS; idx++)
-		{
-			List	   *filters = NIL;
-			Expr	   *rfnode;
 
-			if (rfnodes[idx] == NIL)
-				continue;
-
-			foreach(lc, rfnodes[idx])
-				filters = lappend(filters, stringToNode((char *) lfirst(lc)));
-
-			/* combine the row filter and cache the ExprState */
-			rfnode = make_orclause(filters);
-			entry->exprstate[idx] = ExecPrepareExpr(rfnode, entry->estate);
-		}						/* for each pubaction */
-		MemoryContextSwitchTo(oldctx);
-
-		RelationClose(relation);
-	}
-}
+		pubinfo = (PublicationInfo *) palloc0(sizeof(PublicationInfo));
 
-/*
- * Initialize the column list.
- */
-static void
-pgoutput_column_list_init(PGOutputData *data, List *publications,
-						  RelationSyncEntry *entry)
-{
-	ListCell   *lc;
+		pubinfo->oid = pub->oid;
+		pubinfo->pubactions = pub->pubactions;
 
-	/*
-	 * Find if there are any column lists for this relation. If there are,
-	 * build a bitmap merging all the column lists.
-	 *
-	 * All the given publication-table mappings must be checked.
-	 *
-	 * Multiple publications might have multiple column lists for this relation.
-	 *
-	 * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column
-	 * list" so it takes precedence.
-	 */
-	foreach(lc, publications)
-	{
-		Publication *pub = lfirst(lc);
-		HeapTuple	cftuple = NULL;
-		Datum		cfdatum = 0;
+		if (!pub_no_filter)
+		{
+			entry->estate = create_estate_for_relation(relation);
 
-		/*
-		 * Assume there's no column list. Only if we find pg_publication_rel
-		 * entry with a column list we'll switch it to false.
-		 */
-		bool		pub_no_list = true;
+			pubinfo->rowfilter
+				= ExecPrepareExpr(stringToNode(TextDatumGetCString(rfdatum)),
+								  entry->estate);
+		}
 
 		/*
-		 * If the publication is FOR ALL TABLES then it is treated the same as if
-		 * there are no column lists (even if other publications have a list).
+		 * Build the column list bitmap in the per-entry context.
+		 *
+		 * We need to merge column lists from all publications, so we
+		 * update the same bitmapset. If the column list is null, we
+		 * interpret it as replicating all columns.
 		 */
-		if (!pub->alltables)
+		pubinfo->columns = NULL;
+		if (!pub_no_list)	/* when not null */
 		{
-			/*
-			 * Check for the presence of a column list in this publication.
-			 *
-			 * Note: If we find no pg_publication_rel row, it's a publication
-			 * defined for a whole schema, so it can't have a column list, just
-			 * like a FOR ALL TABLES publication.
-			 */
-			cftuple = SearchSysCache2(PUBLICATIONRELMAP,
-									  ObjectIdGetDatum(entry->publish_as_relid),
-									  ObjectIdGetDatum(pub->oid));
-
-			if (HeapTupleIsValid(cftuple))
-			{
-				/*
-				 * Lookup the column list attribute.
-				 *
-				 * Note: We update the pub_no_list value directly, because if
-				 * the value is NULL, we have no list (and vice versa).
-				 */
-				cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple,
-										  Anum_pg_publication_rel_prattrs,
-										  &pub_no_list);
-
-				/*
-				 * Build the column list bitmap in the per-entry context.
-				 *
-				 * We need to merge column lists from all publications, so we
-				 * update the same bitmapset. If the column list is null, we
-				 * interpret it as replicating all columns.
-				 */
-				if (!pub_no_list)	/* when not null */
-				{
-					pgoutput_ensure_entry_cxt(data, entry);
+			pubinfo->columns = pub_collist_to_bitmapset(pubinfo->columns,
+														cfdatum,
+														entry->entry_cxt);
 
-					entry->columns = pub_collist_to_bitmapset(entry->columns,
-															  cfdatum,
-															  entry->entry_cxt);
-				}
-			}
+			entry->columns = pub_collist_to_bitmapset(entry->columns,
+													  cfdatum,
+													  entry->entry_cxt);
 		}
+		else
+			all_columns = true;
 
-		/*
-		 * Found a publication with no column list, so we're done. But first
-		 * discard column list we might have from preceding publications.
-		 */
-		if (pub_no_list)
-		{
-			if (cftuple)
-				ReleaseSysCache(cftuple);
+		if (HeapTupleIsValid(rftuple))
+			ReleaseSysCache(rftuple);
 
-			bms_free(entry->columns);
-			entry->columns = NULL;
+		MemoryContextSwitchTo(oldctx);
+		RelationClose(relation);
 
-			break;
-		}
+		dlist_push_tail(&entry->pubinfos, &pubinfo->node);
 
-		ReleaseSysCache(cftuple);
 	}							/* loop all subscribed publications */
+
+	/* any of the publications replicates all columns */
+	if (all_columns)
+		entry->columns = NULL;
 }
 
 /*
@@ -1115,7 +1030,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 }
 
 /*
- * Change is checked against the row filter if any.
+ * Change is checked against the row filter if any, and calculate the column
+ * list applicable to the operation (with respect to matching row filters).
  *
  * Returns true if the change is to be replicated, else false.
  *
@@ -1136,6 +1052,8 @@ init_tuple_slot(PGOutputData *data, Relation relation,
  *
  * The new action is updated in the action parameter.
  *
+ * The calculated column list is returned in the column_list parameter.
+ *
  * The new slot could be updated when transforming the UPDATE into INSERT,
  * because the original new tuple might not have column values from the replica
  * identity.
@@ -1167,17 +1085,21 @@ init_tuple_slot(PGOutputData *data, Relation relation,
 static bool
 pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 					TupleTableSlot **new_slot_ptr, RelationSyncEntry *entry,
-					ReorderBufferChangeType *action)
+					ReorderBufferChangeType *action, Bitmapset **column_list)
 {
 	TupleDesc	desc;
 	int			i;
-	bool		old_matched,
-				new_matched,
+	bool		old_matched_any = false,
+				new_matched_any = false,
 				result;
-	TupleTableSlot *tmp_new_slot;
+	TupleTableSlot *tmp_new_slot = NULL;
 	TupleTableSlot *new_slot = *new_slot_ptr;
-	ExprContext *ecxt;
-	ExprState  *filter_exprstate;
+	dlist_iter	iter;
+	bool		matching = false;
+
+	/* Column list calculated from publications matching the row filter. */
+	Bitmapset  *columns = NULL;
+	bool		all_columns = false;
 
 	/*
 	 * We need this map to avoid relying on ReorderBufferChangeType enums
@@ -1195,115 +1117,191 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 
 	Assert(new_slot || old_slot);
 
-	/* Get the corresponding row filter */
-	filter_exprstate = entry->exprstate[map_changetype_pubaction[*action]];
+	dlist_foreach(iter, &entry->pubinfos)
+	{
+		ExprContext *ecxt;
+		bool		old_matched,
+					new_matched;
 
-	/* Bail out if there is no row filter */
-	if (!filter_exprstate)
-		return true;
+		PublicationInfo  *pubinfo
+			= dlist_container(PublicationInfo, node, iter.cur);
 
-	elog(DEBUG3, "table \"%s.%s\" has row filter",
-		 get_namespace_name(RelationGetNamespace(relation)),
-		 RelationGetRelationName(relation));
+		/* ignore publications not replicating this action */
+		if ((*action == REORDER_BUFFER_CHANGE_INSERT) &&
+			(!pubinfo->pubactions.pubinsert))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_UPDATE) &&
+				 (!pubinfo->pubactions.pubupdate))
+			continue;
+		else if ((*action == REORDER_BUFFER_CHANGE_DELETE) &&
+			(!pubinfo->pubactions.pubdelete))
+			continue;
 
-	ResetPerTupleExprContext(entry->estate);
+		if (!pubinfo->rowfilter)
+		{
+			matching = true;
 
-	ecxt = GetPerTupleExprContext(entry->estate);
+			/*
+			 * Update/merge the column list.
+			 *
+			 * If the publication has no column list, we interpret it as a list
+			 * with all columns. Otherwise we just add it to the bitmap.
+			 *
+			 * FIXME This is repeated in three places. Maybe refactor?
+			 */
+			if (!pubinfo->columns)
+			{
+				all_columns = true;
+				bms_free(columns);
+				columns = NULL;
+			}
+			else if (!all_columns)
+				columns = bms_union(columns, pubinfo->columns);
 
-	/*
-	 * For the following occasions where there is only one tuple, we can
-	 * evaluate the row filter for that tuple and return.
-	 *
-	 * For inserts, we only have the new tuple.
-	 *
-	 * For updates, we can have only a new tuple when none of the replica
-	 * identity columns changed and none of those columns have external data
-	 * but we still need to evaluate the row filter for the new tuple as the
-	 * existing values of those columns might not match the filter. Also, users
-	 * can use constant expressions in the row filter, so we anyway need to
-	 * evaluate it for the new tuple.
-	 *
-	 * For deletes, we only have the old tuple.
-	 */
-	if (!new_slot || !old_slot)
-	{
-		ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
-		result = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+			continue;
+		}
 
-		return result;
-	}
+		elog(DEBUG3, "table \"%s.%s\" has row filter",
+			get_namespace_name(RelationGetNamespace(relation)),
+			RelationGetRelationName(relation));
 
-	/*
-	 * Both the old and new tuples must be valid only for updates and need to
-	 * be checked against the row filter.
-	 */
-	Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+		ResetPerTupleExprContext(entry->estate);
+
+		ecxt = GetPerTupleExprContext(entry->estate);
 
-	slot_getallattrs(new_slot);
-	slot_getallattrs(old_slot);
+		/*
+		 * For the following occasions where there is only one tuple, we can
+		 * evaluate the row filter for that tuple and return.
+		 *
+		 * For inserts, we only have the new tuple.
+		 *
+		 * For updates, we can have only a new tuple when none of the replica
+		 * identity columns changed and none of those columns have external data
+		 * but we still need to evaluate the row filter for the new tuple as the
+		 * existing values of those columns might not match the filter. Also, users
+		 * can use constant expressions in the row filter, so we anyway need to
+		 * evaluate it for the new tuple.
+		 *
+		 * For deletes, we only have the old tuple.
+		 */
+		if (!new_slot || !old_slot)
+		{
+			ecxt->ecxt_scantuple = new_slot ? new_slot : old_slot;
+			result = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
 
-	tmp_new_slot = NULL;
-	desc = RelationGetDescr(relation);
+			matching |= result;
 
-	/*
-	 * The new tuple might not have all the replica identity columns, in which
-	 * case it needs to be copied over from the old tuple.
-	 */
-	for (i = 0; i < desc->natts; i++)
-	{
-		Form_pg_attribute att = TupleDescAttr(desc, i);
+			/*
+			 * FIXME refactor to reuse this code in multiple places
+			 * 
+			 * XXX Should this only update column list when using new slot?
+			 * If evaluationg old slot, that's delete, no?
+			 */
+			if (result)
+			{
+				if (!pubinfo->columns)
+				{
+					all_columns = true;
+					bms_free(columns);
+					columns = NULL;
+				}
+				else if (!all_columns)
+				{
+					columns = bms_union(columns, pubinfo->columns);
+				}
+			}
+
+			continue;
+		}
 
 		/*
-		 * if the column in the new tuple or old tuple is null, nothing to do
+		 * Both the old and new tuples must be valid only for updates and need to
+		 * be checked against the row filter.
 		 */
-		if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
-			continue;
+		Assert(map_changetype_pubaction[*action] == PUBACTION_UPDATE);
+
+		slot_getallattrs(new_slot);
+		slot_getallattrs(old_slot);
+
+		tmp_new_slot = NULL;
+		desc = RelationGetDescr(relation);
 
 		/*
-		 * Unchanged toasted replica identity columns are only logged in the
-		 * old tuple. Copy this over to the new tuple. The changed (or WAL
-		 * Logged) toast values are always assembled in memory and set as
-		 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+		 * The new tuple might not have all the replica identity columns, in which
+		 * case it needs to be copied over from the old tuple.
 		 */
-		if (att->attlen == -1 &&
-			VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
-			!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
+		for (i = 0; i < desc->natts; i++)
 		{
-			if (!tmp_new_slot)
+			Form_pg_attribute att = TupleDescAttr(desc, i);
+
+			/*
+			 * if the column in the new tuple or old tuple is null, nothing to do
+			 */
+			if (new_slot->tts_isnull[i] || old_slot->tts_isnull[i])
+				continue;
+
+			/*
+			 * Unchanged toasted replica identity columns are only logged in the
+			 * old tuple. Copy this over to the new tuple. The changed (or WAL
+			 * Logged) toast values are always assembled in memory and set as
+			 * VARTAG_INDIRECT. See ReorderBufferToastReplace.
+			 */
+			if (att->attlen == -1 &&
+				VARATT_IS_EXTERNAL_ONDISK(new_slot->tts_values[i]) &&
+				!VARATT_IS_EXTERNAL_ONDISK(old_slot->tts_values[i]))
 			{
-				tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
-				ExecClearTuple(tmp_new_slot);
+				if (!tmp_new_slot)
+				{
+					tmp_new_slot = MakeSingleTupleTableSlot(desc, &TTSOpsVirtual);
+					ExecClearTuple(tmp_new_slot);
 
-				memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
-					   desc->natts * sizeof(Datum));
-				memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
-					   desc->natts * sizeof(bool));
-			}
+					memcpy(tmp_new_slot->tts_values, new_slot->tts_values,
+						   desc->natts * sizeof(Datum));
+					memcpy(tmp_new_slot->tts_isnull, new_slot->tts_isnull,
+						   desc->natts * sizeof(bool));
+				}
 
-			tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
-			tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+				tmp_new_slot->tts_values[i] = old_slot->tts_values[i];
+				tmp_new_slot->tts_isnull[i] = old_slot->tts_isnull[i];
+			}
 		}
-	}
 
-	ecxt->ecxt_scantuple = old_slot;
-	old_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		ecxt->ecxt_scantuple = old_slot;
 
-	if (tmp_new_slot)
-	{
-		ExecStoreVirtualTuple(tmp_new_slot);
-		ecxt->ecxt_scantuple = tmp_new_slot;
-	}
-	else
-		ecxt->ecxt_scantuple = new_slot;
+		old_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		old_matched_any |= old_matched;
+
+		if (tmp_new_slot)
+		{
+			ExecStoreVirtualTuple(tmp_new_slot);
+			ecxt->ecxt_scantuple = tmp_new_slot;
+		}
+		else
+			ecxt->ecxt_scantuple = new_slot;
 
-	new_matched = pgoutput_row_filter_exec_expr(filter_exprstate, ecxt);
+		new_matched = pgoutput_row_filter_exec_expr(pubinfo->rowfilter, ecxt);
+		new_matched_any |= new_matched;
 
-	/*
-	 * Case 1: if both tuples don't match the row filter, bailout. Send
-	 * nothing.
-	 */
-	if (!old_matched && !new_matched)
-		return false;
+		/*
+		 * Case 1: if both tuples don't match the row filter, bailout. Send
+		 * nothing.
+		 */
+		if (!old_matched && !new_matched)
+			continue;	/* continue with the next row filter */
+
+		/*
+		 * Case 4: if both tuples match the row filter, transformation isn't
+		 * required. (*action is default UPDATE).
+		 */
+		if (!pubinfo->columns)
+		{
+			all_columns = true;
+			bms_free(columns);
+			columns = NULL;
+		}
+		else if (!all_columns && new_matched)
+			columns = bms_union(columns, pubinfo->columns);
+	}
 
 	/*
 	 * Case 2: if the old tuple doesn't satisfy the row filter but the new
@@ -1314,9 +1312,10 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * while inserting the tuple in the downstream node, we have all the
 	 * required column values.
 	 */
-	if (!old_matched && new_matched)
+	if (!old_matched_any && new_matched_any)
 	{
 		*action = REORDER_BUFFER_CHANGE_INSERT;
+		matching = true;
 
 		if (tmp_new_slot)
 			*new_slot_ptr = tmp_new_slot;
@@ -1329,15 +1328,18 @@ pgoutput_row_filter(Relation relation, TupleTableSlot *old_slot,
 	 * This transformation does not require another tuple. The Old tuple will
 	 * be used for DELETE.
 	 */
-	else if (old_matched && !new_matched)
+	else if (old_matched_any && !new_matched_any)
+	{
 		*action = REORDER_BUFFER_CHANGE_DELETE;
+		matching = true;
+	}
+	else if (old_matched_any && new_matched_any)
+		matching = true;
 
-	/*
-	 * Case 4: if both tuples match the row filter, transformation isn't
-	 * required. (*action is default UPDATE).
-	 */
+	if (column_list)
+		*column_list = columns;
 
-	return true;
+	return matching;
 }
 
 /*
@@ -1359,6 +1361,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	ReorderBufferChangeType action = change->action;
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
+	Bitmapset	   *columns = NULL;
 
 	if (!is_publishable_relation(relation))
 		return;
@@ -1423,7 +1426,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
+									 &action, &columns))
 				break;
 
 			/*
@@ -1444,7 +1447,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
-									data->binary, relentry->columns);
+									data->binary,
+									relentry->columns, columns);
 			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -1483,7 +1487,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 			/* Check row filter */
 			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
+									 relentry, &action, &columns))
 				break;
 
 			/* Send BEGIN if we haven't yet */
@@ -1503,12 +1507,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INSERT:
 					logicalrep_write_insert(ctx->out, xid, targetrel,
 											new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_UPDATE:
 					logicalrep_write_update(ctx->out, xid, targetrel,
 											old_slot, new_slot, data->binary,
-											relentry->columns);
+											relentry->columns, columns);
 					break;
 				case REORDER_BUFFER_CHANGE_DELETE:
 					logicalrep_write_delete(ctx->out, xid, targetrel,
@@ -1547,7 +1551,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				/* Check row filter */
 				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
+										 relentry, &action, NULL))
 					break;
 
 				/* Send BEGIN if we haven't yet */
@@ -1977,7 +1981,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
 		entry->new_slot = NULL;
 		entry->old_slot = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 		entry->entry_cxt = NULL;
 		entry->publish_as_relid = InvalidOid;
 		entry->columns = NULL;
@@ -2056,7 +2060,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 
 		entry->entry_cxt = NULL;
 		entry->estate = NULL;
-		memset(entry->exprstate, 0, sizeof(entry->exprstate));
+		dlist_init(&entry->pubinfos);
 
 		/*
 		 * Build publication cache. We can't use one provided by relcache as
@@ -2192,11 +2196,8 @@ get_rel_sync_entry(PGOutputData *data, Relation relation)
 			/* Initialize the tuple slot and map */
 			init_tuple_slot(data, relation, entry);
 
-			/* Initialize the row filter */
+			/* Initialize the row filter and column list info */
 			pgoutput_row_filter_init(data, rel_publications, entry);
-
-			/* Initialize the column list */
-			pgoutput_column_list_init(data, rel_publications, entry);
 		}
 
 		list_free(pubids);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8ff3..7d45b94d3c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -209,12 +209,14 @@ extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
 extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
-									bool binary, Bitmapset *columns);
+									bool binary, Bitmapset *schema_columns,
+									Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *oldslot,
-									TupleTableSlot *newslot, bool binary, Bitmapset *columns);
+									TupleTableSlot *newslot, bool binary,
+									Bitmapset *schema_columns, Bitmapset *columns);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);

Reply via email to