From 5f19b17c12ddcc6615afd8a9fdf435826b80d9e3 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Wed, 30 Apr 2025 06:23:03 -0400
Subject: [PATCH v18 2/3] Introduce an output plugin callback to filter changes

This new output plugin callback provides an option to logical decoding plugins to filter out
changes early. The primary purpose of the callback is to conserve memory and processing cycles
by excluding changes that are not required by output plugins.
---
 doc/src/sgml/logicaldecoding.sgml               | 41 ++++++++++++++-
 src/backend/replication/logical/decode.c        | 38 ++++++++++++--
 src/backend/replication/logical/logical.c       | 41 +++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 57 +++++++++++++++-----
 src/backend/replication/pgoutput/pgoutput.c     | 70 +++++++++++++++++++++++++
 src/include/replication/output_plugin.h         | 20 +++++++
 src/include/replication/reorderbuffer.h         | 12 ++++-
 src/test/subscription/t/013_partition.pl        |  7 +++
 8 files changed, 267 insertions(+), 19 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 3f2bcd4..788fe05 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -560,6 +560,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+    LogicalDecodeFilterChangeCB filter_change_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
@@ -582,8 +583,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>, <function>truncate_cb</function>,
      <function>message_cb</function>, <function>filter_by_origin_cb</function>,
-     and <function>shutdown_cb</function> are optional.
-     If <function>truncate_cb</function> is not set but a
+     <function>shutdown_cb</function>, and <function>filter_change_cb</function>
+     are optional. If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
 
@@ -871,6 +872,42 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-change">
+     <title>Change Filter Callback</title>
+
+     <para>
+      The optional <function>filter_change_cb</function> is called before a
+      change record is decoded to determine whether the change can be filtered
+      out.
+<programlisting>
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+                                             Oid relid,
+                                             ReorderBufferChangeType change_type,
+                                             bool in_txn, bool *cache_valid);
+</programlisting>
+      To indicate that decoding can be skipped for the given
+      <parameter>change_type</parameter>, return <literal>true</literal>;
+      <literal>false</literal> otherwise.
+      The <parameter>in_txn</parameter> parameter indicates whether the
+      callback is invoked within a transaction block.
+      When <parameter>in_txn</parameter> is false, and if making a decision to filter a change requires being inside a
+      transaction block, such as needing access to the catalog, set
+      <parameter>*cache_valid</parameter> to <literal>false</literal>.
+      This ensures that the callback will be reinvoked once a transaction block
+      starts. If a decision can be made immediately, set
+      <parameter>*cache_valid</parameter> to <literal>true</literal>.
+     </para>
+     <para>
+      The primary purpose of this callback function is to optimize memory usage
+      and processing efficiency by filtering out changes that are unnecessary for
+      output plugins. It enables output plugins to selectively process relevant
+      changes. Caching filtering decisions locally is recommended, as it enables
+      the callback to provide cached results without repeatedly initiating
+      transactions or querying the catalog. This approach minimizes overhead
+      and improves efficiency during the decoding process.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 261774b..77c8859 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -894,6 +894,18 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	UpdateDecodingStats(ctx);
 }
 
+/* Function to determine whether to filter the change */
+static inline bool
+FilterChange(LogicalDecodingContext *ctx, XLogRecPtr origptr, TransactionId xid,
+							RelFileLocator *target_locator, ReorderBufferChangeType change_type)
+{
+	return
+		(ctx->callbacks.filter_change_cb &&
+		 ctx->reorder->try_to_filter_change &&
+		 ReorderBufferFilterByRelFileLocator(ctx->reorder, xid, origptr, target_locator,
+											 change_type));
+}
+
 /*
  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
  *
@@ -928,9 +940,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	if (ctx->reorder->try_to_filter_change &&
-			ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
-												buf->origptr, &target_locator))
+	/* Can the relation associated with this change be skipped? */
+	if (FilterChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator,
+						REORDER_BUFFER_CHANGE_INSERT))
 		return;
 
 	change = ReorderBufferAllocChange(ctx->reorder);
@@ -983,6 +995,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/* Can the relation associated with this change be skipped? */
+	if (FilterChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator,
+					   REORDER_BUFFER_CHANGE_UPDATE))
+		return;
+
 	change = ReorderBufferAllocChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
 	change->origin_id = XLogRecGetOrigin(r);
@@ -1049,6 +1066,11 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/* Can the relation associated with this change be skipped? */
+	if (FilterChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator,
+					   REORDER_BUFFER_CHANGE_DELETE))
+		return;
+
 	change = ReorderBufferAllocChange(ctx->reorder);
 
 	if (xlrec->flags & XLH_DELETE_IS_SUPER)
@@ -1151,6 +1173,11 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/* Can the relation associated with this change be skipped? */
+	if (FilterChange(ctx, buf->origptr, XLogRecGetXid(r), &rlocator,
+					   REORDER_BUFFER_CHANGE_INSERT))
+		return;
+
 	/*
 	 * We know that this multi_insert isn't for a catalog, so the block should
 	 * always have data even if a full-page write of it is taken.
@@ -1242,6 +1269,11 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/* Can the relation associated with this change be skipped? */
+	if (FilterChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator,
+						REORDER_BUFFER_CHANGE_INSERT))
+		return;
+
 	change = ReorderBufferAllocChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
 	change->origin_id = XLogRecGetOrigin(r);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index a8d2e02..09ee1df 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -74,6 +74,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static bool filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+								  ReorderBufferChangeType change_type, bool in_txn,
+								  bool *cache_valid);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -220,6 +223,7 @@ StartupDecodingContext(List *output_plugin_options,
 	/* wrap output plugin callbacks, so we can add error context information */
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
+	ctx->reorder->filter_change = filter_change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
@@ -1224,6 +1228,43 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static bool
+filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+					  ReorderBufferChangeType change_type, bool in_txn,
+					  bool *cache_valid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	if (ctx->callbacks.filter_change_cb == NULL)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_change";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_change_cb(ctx, relid, change_type, in_txn, cache_valid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c7e063a..cd2b052 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5703,17 +5703,19 @@ ReorderBufferMemoryResetcallback(void *arg)
  * management, the results are cached in the 'RelFileLocatorFilterCache' hash
  * table.
  *
- * Returns true if the relation can be filtered; otherwise, false.
+ * Returns true if this change_type can be filtered; otherwise, false.
  */
 bool
 ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
-									 XLogRecPtr lsn, RelFileLocator *rlocator)
+									XLogRecPtr lsn, RelFileLocator *rlocator,
+									ReorderBufferChangeType change_type)
 
 {
-	bool		found;
-	Relation	relation;
-	bool		using_subtxn;
-	Snapshot	snapshot_now;
+	bool        found;
+	bool		cache_valid;
+	Relation    relation;
+	bool        using_subtxn;
+	Snapshot    snapshot_now;
 	ReorderBufferTXN *txn,
 					 *toptxn;
 	ReorderBufferRelFileLocatorEnt *entry;
@@ -5724,7 +5726,6 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 	Assert(txn);
 	toptxn = rbtxn_get_toptxn(txn);
-	rb->try_to_filter_change = false;
 
 	/*
 	 * We cannot construct an accurate historical snapshot until all pending
@@ -5741,7 +5742,26 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (found)
 	{
 		rb->try_to_filter_change = entry->filterable;
-		return entry->filterable;
+
+		/*
+		 * Quick return if we already know that the relation is not to be
+		 * decoded. This is for special relations that are unlogged and for
+		 * sequences and catalogs.
+		 */
+		if (entry->filterable)
+			return true;
+
+		/* Allow the output plugin to filter relations */
+		rb->try_to_filter_change = rb->filter_change(rb, entry->relid, change_type,
+													 false, &cache_valid);
+
+		/*
+		 * If plugin had the relation ready in cache, the response is valid,
+		 * else we'll need to call the plugin a second time within a
+		 * transaction.
+		 */
+		if (cache_valid)
+			return rb->try_to_filter_change;
 	}
 
 	/* constructs a temporary historical snapshot */
@@ -5765,18 +5785,29 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 
 	if (RelationIsValid(relation))
 	{
-		entry->relid = RelationGetRelid(relation);
+		if (IsToastRelation(relation))
+		{
+			char   *toast_name = RelationGetRelationName(relation);
+			int     n PG_USED_FOR_ASSERTS_ONLY;
+
+			n = sscanf(toast_name, "pg_toast_%u", &entry->relid);
+
+			Assert(n == 1);
+		}
+		else
+			entry->relid = RelationGetRelid(relation);
+
 		entry->filterable = false;
+		rb->try_to_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  true, &cache_valid);
 		RelationClose(relation);
 	}
 	else
 	{
 		entry->relid = InvalidOid;
-		entry->filterable = true;
+		rb->try_to_filter_change = entry->filterable = true;
 	}
 
-	rb->try_to_filter_change = entry->filterable;
-
 	ReorderBufferFreeSnap(rb, snapshot_now);
 
 	TeardownHistoricSnapshot(false);
@@ -5792,7 +5823,7 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (using_subtxn)
 		RollbackAndReleaseCurrentSubTransaction();
 
-	return entry->filterable;
+	return rb->try_to_filter_change;
 }
 
 /*
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766..c356423 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -60,6 +60,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+								   ReorderBufferChangeType change_type, bool in_txn,
+								   bool *cache_valid);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -271,6 +274,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_change_cb = pgoutput_filter_change;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1749,6 +1753,72 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 }
 
 /*
+ * Determine whether a change to the specified relation should be published.
+ *
+ * See the comments atop of LogicalDecodeFilterChangeCB for details.
+ */
+static bool
+pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+					   ReorderBufferChangeType change_type, bool in_txn, bool *cache_valid)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *entry;
+
+	Assert(RelationSyncCache != NULL);
+
+	if (in_txn)
+	{
+		Relation	relation;
+
+		relation = RelationIdGetRelation(relid);
+		entry = get_rel_sync_entry(data, relation);
+		*cache_valid = true;
+	}
+	else
+	{
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
+												  &relid,
+												  HASH_FIND, cache_valid);
+		if (!*cache_valid)
+			return false;
+	}
+
+	/*
+	 * If the pubaction is not supported by this publication then return true to say
+	 * the change for this entry can be skipped.
+	 */
+	switch (change_type)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!entry->pubactions.pubinsert)
+			{
+				elog(DEBUG1, "Filtering INSERT");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!entry->pubactions.pubupdate)
+			{
+				elog(DEBUG1, "Filtering UPDATE");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!entry->pubactions.pubdelete)
+			{
+				elog(DEBUG1, "Filtering DELETE");
+				return true;
+			}
+			break;
+		default:
+			/* allow any other changes that are not explicitly filtered */
+			return false;
+	}
+
+	return false;
+}
+
+/*
  * Shutdown the output plugin.
  *
  * Note, we don't need to clean the data->context, data->cachectx, and
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71..f82bc1d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -97,6 +97,25 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
 											   RepOriginId origin_id);
 
 /*
+ * This callback is called before a change record is decoded to determine
+ * whether the change can be filtered out before entering the reorder buffer.
+ *
+ * Typically, the output plugin needs to refer to the system catalog to make
+ * this decision. To enhance efficiency, the plugin should cache the lookup
+ * result for each relation, minimizing catalog access on subsequent calls.
+ *
+ * If the callback is called with 'in_txn' set as false (the reorder
+ * buffer has not started a transaction), the output plugin should set '*cache_valid'
+ * to false to indicate that the result is not available in its internal cache.
+ * If 'in_txn' is true, the plugin can create a cache entry after querying the
+ * catalog.
+ */
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
@@ -222,6 +241,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterChangeCB filter_change_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 363d9cd..0131022 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -494,6 +494,12 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* filter change callback signature */
+typedef bool (*ReorderBufferFilterChangeCB) (ReorderBuffer *rb,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -604,6 +610,7 @@ struct ReorderBuffer
 	 */
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
+	ReorderBufferFilterChangeCB filter_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
@@ -779,6 +786,9 @@ extern uint32 ReorderBufferGetInvalidations(ReorderBuffer *rb,
 extern void StartupReorderBuffer(void);
 
 extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
-												XLogRecPtr lsn, RelFileLocator *rlocator);
+												XLogRecPtr lsn, RelFileLocator *rlocator,
+												ReorderBufferChangeType change_type);
+
+extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
 
 #endif
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 61b0cb4..a64b27e 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -472,6 +472,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
+# Ensure that the subscription 'sub2' catches up with the latest changes. This
+# is necessary for the walsender to update its historic snapshot. Otherwise,
+# the walsender might retain an outdated snapshot, potentially preventing it
+# from accessing the newly created publication.
+$node_publisher->wait_for_catchup('sub2');
+
 # Since we specified publish_via_partition_root in pub_all and
 # pub_lower_level, all partition tables use their root tables' identity and
 # schema. We set the list of publications so that the FOR ALL TABLES
-- 
1.8.3.1

