From 4f3f09618e91d9a0e36bfa3163a64384df6ee4d2 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Fri, 24 Jan 2025 00:40:53 -0500
Subject: [PATCH v12 3/3] Introduce a output plugin callback to filter changes

This new output plugin callback provides an option to logical decoding plugins to filter out
changes early. The primary purpose of the callback is to conserve memory and processing cycles
by excluding changes that are not required by output plugins.
---
 doc/src/sgml/logicaldecoding.sgml               | 41 ++++++++++++++++++-
 src/backend/replication/logical/decode.c        | 22 +++++++++-
 src/backend/replication/logical/logical.c       | 42 +++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 45 +++++++++++++++++----
 src/backend/replication/pgoutput/pgoutput.c     | 54 +++++++++++++++++++++++++
 src/include/replication/output_plugin.h         | 20 +++++++++
 src/include/replication/reorderbuffer.h         |  8 ++++
 src/test/subscription/t/013_partition.pl        |  7 ++++
 8 files changed, 228 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 1c4ae38..a603c47 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -560,6 +560,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+    LogicalDecodeFilterChangeCB filter_change_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
@@ -582,8 +583,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>, <function>truncate_cb</function>,
      <function>message_cb</function>, <function>filter_by_origin_cb</function>,
-     and <function>shutdown_cb</function> are optional.
-     If <function>truncate_cb</function> is not set but a
+     <function>shutdown_cb</function>, and <function>filter_change_cb</function>
+     are optional. If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
 
@@ -871,6 +872,42 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-change">
+     <title>Change Filter Callback</title>
+
+     <para>
+      The optional <function>filter_change_cb</function> is called before a
+      change record is decoded to determine whether the change can be filtered
+      out.
+<programlisting>
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+                                             Oid relid,
+                                             ReorderBufferChangeType change_type,
+                                             bool in_txn, bool *cache_valid);
+</programlisting>
+      To indicate that decoding can be skipped for the given change
+      <parameter>change_type</parameter>, return <literal>true</literal>;
+      <literal>false</literal> otherwise.
+      The <parameter>in_txn</parameter> parameter indicates whether the
+      callback is invoked within a transaction block.
+      When <parameter>in_txn</parameter> is false, and if making a decision to filter a change requires being inside a
+      transaction block, such as needing access to the catalog, set
+      <parameter>*cache_valid</parameter> to <literal>false</literal>.
+      This ensures that the callback will be reinvoked once a transaction block
+      starts. If a decision can be made immediately, set
+      <parameter>*cache_valid</parameter> to <literal>true</literal>.
+     </para>
+     <para>
+      The primary purpose of this callback function is to optimize memory usage
+      and processing efficiency by filtering out changes that are unnecessary for
+      output plugins. It enables output plugins to selectively process relevant
+      changes. Caching filtering decisions locally is recommended, as it enables
+      the callback to provide cached results without repeatedly initiating
+      transactions or querying the catalog. This approach minimizes overhead
+      and improves efficiency during the decoding process.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c5f1083..7c01e7c 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -576,6 +576,17 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 }
 
 /*
+ * Check if filtering changes before decoding is supported and we're not suppressing filter
+ * changes currently.
+ */
+static inline bool
+FilterChangeIsEnabled(LogicalDecodingContext *ctx)
+{
+	return (ctx->callbacks.filter_change_cb != NULL &&
+				ctx->reorder->can_filter_change);
+}
+
+/*
  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
  */
 void
@@ -915,9 +926,16 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	if (ctx->reorder->can_filter_change &&
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
 		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
-											buf->origptr, &target_locator, true))
+											buf->origptr, &target_locator,
+											REORDER_BUFFER_CHANGE_INSERT,
+											true))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efa..adc5383 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -74,6 +74,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static bool filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+								  ReorderBufferChangeType change_type, bool in_txn,
+								  bool *cache_valid);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -220,6 +223,7 @@ StartupDecodingContext(List *output_plugin_options,
 	/* wrap output plugin callbacks, so we can add error context information */
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
+	ctx->reorder->filter_change = filter_change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
@@ -1243,6 +1247,44 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static bool
+filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+					  ReorderBufferChangeType change_type, bool in_txn,
+					  bool *cache_valid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* check if the filter change callback is supported */
+	if (ctx->callbacks.filter_change_cb == NULL)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_change";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_change_cb(ctx, relid, change_type, in_txn, cache_valid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5974c77..052d91f 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5550,9 +5550,11 @@ ReorderBufferMemoryResetcallback(void *arg)
 bool
 ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 									XLogRecPtr lsn, RelFileLocator *rlocator,
+									ReorderBufferChangeType change_type,
 									bool has_tuple)
 {
 	bool		found;
+	bool		cache_valid;
 	Relation	relation;
 	bool		using_subtxn;
 	Snapshot	snapshot_now;
@@ -5566,7 +5568,6 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 	Assert(txn);
 	toptxn = rbtxn_get_toptxn(txn);
-	rb->can_filter_change = false;
 
 	/*
 	 * We cannot construct an accurate historical snapshot until all pending
@@ -5583,7 +5584,25 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (found)
 	{
 		rb->can_filter_change = entry->filterable;
-		return entry->filterable;
+
+		/*
+		 * Quick return if we already know that the relation is not to be decoded.
+		 * These are for special relations that are unlogged and for sequences
+		 * and catalogs.
+		 */
+		if (entry->filterable)
+			return true;
+
+		/* Allow the output plugin to filter relations */
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  false, &cache_valid);
+
+		/*
+		 * If plugin had the relation ready in cache, the response is valid, else
+		 * we'll need to call the plugin a second time within a transaction.
+		 */
+		if (cache_valid)
+			return rb->can_filter_change;
 	}
 
 	/* constructs a temporary historical snapshot */
@@ -5607,18 +5626,30 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 
 	if (RelationIsValid(relation))
 	{
-		entry->relid = RelationGetRelid(relation);
+		if (IsToastRelation(relation))
+		{
+			Oid     real_reloid = InvalidOid;
+			char   *toast_name = RelationGetRelationName(relation);
+			/* pg_toast_ len is 9 */
+			char   *start_ch = &toast_name[9];
+
+			real_reloid = pg_strtoint32(start_ch);
+			entry->relid = real_reloid;
+		}
+		else
+			entry->relid = RelationGetRelid(relation);
+
 		entry->filterable = false;
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  true, &cache_valid);
 		RelationClose(relation);
 	}
 	else
 	{
 		entry->relid = InvalidOid;
-		entry->filterable = true;
+		rb->can_filter_change = entry->filterable = true;
 	}
 
-	rb->can_filter_change = entry->filterable;
-
 	ReorderBufferFreeSnap(rb, snapshot_now);
 
 	TeardownHistoricSnapshot(false);
@@ -5634,5 +5665,5 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (using_subtxn)
 		RollbackAndReleaseCurrentSubTransaction();
 
-	return entry->filterable;
+	return rb->can_filter_change;
 }
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index a363c88..c15b053 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -56,6 +56,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+								   ReorderBufferChangeType change_type, bool in_txn,
+								   bool *cache_valid);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -267,6 +270,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_change_cb = pgoutput_filter_change;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1742,6 +1746,56 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 }
 
 /*
+ * Determine whether a change to the specified relation should be published.
+ *
+ * See the comments atop of LogicalDecodeFilterChangeCB for details.
+ */
+static bool
+pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+					   ReorderBufferChangeType change_type, bool in_txn, bool *cache_valid)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *entry;
+
+	Assert(RelationSyncCache != NULL);
+
+	if (in_txn)
+	{
+		Relation	relation;
+
+		relation = RelationIdGetRelation(relid);
+		entry = get_rel_sync_entry(data, relation);
+		*cache_valid = true;
+	}
+	else
+	{
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
+												  &relid,
+												  HASH_FIND, cache_valid);
+		if (!*cache_valid)
+			return false;
+	}
+
+	/*
+	 * If the pubaction is not supported by this publication then return true to say
+	 * the change for this entry can be skipped.
+	 */
+	switch (change_type)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			return !entry->pubactions.pubinsert;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			return !entry->pubactions.pubupdate;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			return !entry->pubactions.pubdelete;
+		default:
+			/* allow any other changes that are not explicitly filtered */
+			return false;
+	}
+
+}
+
+/*
  * Shutdown the output plugin.
  *
  * Note, we don't need to clean the data->context, data->cachectx, and
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71..f82bc1d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -97,6 +97,25 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
 											   RepOriginId origin_id);
 
 /*
+ * This callback is called before a change record is decoded to determine
+ * whether the change can be filtered out before entering the reorder buffer.
+ *
+ * Typically, the output plugin needs to refer to the system catalog to make
+ * this decision. To enhance efficiency, the plugin should cache the lookup
+ * result for each relation, minimizing catalog access on subsequent calls.
+ *
+ * If the callback is called with 'in_txn' set as false (the reorder
+ * buffer has not started a transaction), the output plugin should set '*cache_valid'
+ * to false to indicate that the result is not available in its internal cache.
+ * If 'in_txn' is true, the plugin can create a cache entry after querying the
+ * catalog.
+ */
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
@@ -222,6 +241,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterChangeCB filter_change_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b1267e9..b138365 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -468,6 +468,12 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* filter change callback signature */
+typedef bool (*ReorderBufferFilterChangeCB) (ReorderBuffer *rb,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -578,6 +584,7 @@ struct ReorderBuffer
 	 */
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
+	ReorderBufferFilterChangeCB filter_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
@@ -751,6 +758,7 @@ extern void StartupReorderBuffer(void);
 
 extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 												XLogRecPtr lsn, RelFileLocator *rlocator,
+												ReorderBufferChangeType change_type,
 												bool has_tuple);
 extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
 
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 14a3bea..180eaa0 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -468,6 +468,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
+# Ensure that the subscription 'sub2' catches up with the latest changes. This
+# is necessary for the walsender to update its historic snapshot. Otherwise,
+# the walsender might retain an outdated snapshot, potentially preventing it
+# from accessing the newly created publication.
+$node_publisher->wait_for_catchup('sub2');
+
 # Since we specified publish_via_partition_root in pub_all and
 # pub_lower_level, all partition tables use their root tables' identity and
 # schema. We set the list of publications so that the FOR ALL TABLES
-- 
1.8.3.1

