From 944993771427e94d8e74411303c876b61d757bc9 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 11 Feb 2025 23:49:50 -0500
Subject: [PATCH v13 3/3] Introduce a output plugin callback to filter changes

This new output plugin callback provides an option to logical decoding plugins to filter out
changes early. The primary purpose of the callback is to conserve memory and processing cycles
by excluding changes that are not required by output plugins.
---
 doc/src/sgml/logicaldecoding.sgml               | 41 ++++++++++++++-
 src/backend/replication/logical/decode.c        | 70 ++++++++++++++++++++++++-
 src/backend/replication/logical/logical.c       | 42 +++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 45 +++++++++++++---
 src/backend/replication/pgoutput/pgoutput.c     | 70 +++++++++++++++++++++++++
 src/include/replication/output_plugin.h         | 20 +++++++
 src/include/replication/reorderbuffer.h         |  8 +++
 src/test/subscription/t/001_rep_changes.pl      | 18 ++++++-
 src/test/subscription/t/013_partition.pl        |  7 +++
 9 files changed, 308 insertions(+), 13 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 1c4ae38..a603c47 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -560,6 +560,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+    LogicalDecodeFilterChangeCB filter_change_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
@@ -582,8 +583,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>, <function>truncate_cb</function>,
      <function>message_cb</function>, <function>filter_by_origin_cb</function>,
-     and <function>shutdown_cb</function> are optional.
-     If <function>truncate_cb</function> is not set but a
+     <function>shutdown_cb</function>, and <function>filter_change_cb</function>
+     are optional. If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
 
@@ -871,6 +872,42 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-change">
+     <title>Change Filter Callback</title>
+
+     <para>
+      The optional <function>filter_change_cb</function> is called before a
+      change record is decoded to determine whether the change can be filtered
+      out.
+<programlisting>
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+                                             Oid relid,
+                                             ReorderBufferChangeType change_type,
+                                             bool in_txn, bool *cache_valid);
+</programlisting>
+      To indicate that decoding can be skipped for the given change
+      <parameter>change_type</parameter>, return <literal>true</literal>;
+      <literal>false</literal> otherwise.
+      The <parameter>in_txn</parameter> parameter indicates whether the
+      callback is invoked within a transaction block.
+      When <parameter>in_txn</parameter> is false, and if making a decision to filter a change requires being inside a
+      transaction block, such as needing access to the catalog, set
+      <parameter>*cache_valid</parameter> to <literal>false</literal>.
+      This ensures that the callback will be reinvoked once a transaction block
+      starts. If a decision can be made immediately, set
+      <parameter>*cache_valid</parameter> to <literal>true</literal>.
+     </para>
+     <para>
+      The primary purpose of this callback function is to optimize memory usage
+      and processing efficiency by filtering out changes that are unnecessary for
+      output plugins. It enables output plugins to selectively process relevant
+      changes. Caching filtering decisions locally is recommended, as it enables
+      the callback to provide cached results without repeatedly initiating
+      transactions or querying the catalog. This approach minimizes overhead
+      and improves efficiency during the decoding process.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c5f1083..fd6a177 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -576,6 +576,17 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 }
 
 /*
+ * Check if filtering changes before decoding is supported and we're not suppressing filter
+ * changes currently.
+ */
+static inline bool
+FilterChangeIsEnabled(LogicalDecodingContext *ctx)
+{
+	return (ctx->callbacks.filter_change_cb != NULL &&
+				ctx->reorder->can_filter_change);
+}
+
+/*
  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
  */
 void
@@ -915,9 +926,16 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	if (ctx->reorder->can_filter_change &&
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
 		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
-											buf->origptr, &target_locator, true))
+											buf->origptr, &target_locator,
+											REORDER_BUFFER_CHANGE_INSERT,
+											true))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -970,6 +988,18 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &target_locator,
+											REORDER_BUFFER_CHANGE_UPDATE,
+											true))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
 	change->origin_id = XLogRecGetOrigin(r);
@@ -1036,6 +1066,18 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &target_locator,
+											REORDER_BUFFER_CHANGE_DELETE,
+											true))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 
 	if (xlrec->flags & XLH_DELETE_IS_SUPER)
@@ -1139,6 +1181,18 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &rlocator,
+											REORDER_BUFFER_CHANGE_INSERT,
+											true))
+		return;
+
+	/*
 	 * We know that this multi_insert isn't for a catalog, so the block should
 	 * always have data even if a full-page write of it is taken.
 	 */
@@ -1231,6 +1285,18 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (FilterChangeIsEnabled(ctx) &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &target_locator,
+											REORDER_BUFFER_CHANGE_INSERT,
+											true))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
 	change->origin_id = XLogRecGetOrigin(r);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8ea846b..635e1de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -74,6 +74,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static bool filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+								  ReorderBufferChangeType change_type, bool in_txn,
+								  bool *cache_valid);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -220,6 +223,7 @@ StartupDecodingContext(List *output_plugin_options,
 	/* wrap output plugin callbacks, so we can add error context information */
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
+	ctx->reorder->filter_change = filter_change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
@@ -1224,6 +1228,44 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static bool
+filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+					  ReorderBufferChangeType change_type, bool in_txn,
+					  bool *cache_valid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* check if the filter change callback is supported */
+	if (ctx->callbacks.filter_change_cb == NULL)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_change";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_change_cb(ctx, relid, change_type, in_txn, cache_valid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 7be0033..9a1dfb6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -5549,9 +5549,11 @@ ReorderBufferMemoryResetcallback(void *arg)
 bool
 ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 									XLogRecPtr lsn, RelFileLocator *rlocator,
+									ReorderBufferChangeType change_type,
 									bool has_tuple)
 {
 	bool		found;
+	bool		cache_valid;
 	Relation	relation;
 	bool		using_subtxn;
 	Snapshot	snapshot_now;
@@ -5565,7 +5567,6 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 	Assert(txn);
 	toptxn = rbtxn_get_toptxn(txn);
-	rb->can_filter_change = false;
 
 	/*
 	 * We cannot construct an accurate historical snapshot until all pending
@@ -5582,7 +5583,25 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (found)
 	{
 		rb->can_filter_change = entry->filterable;
-		return entry->filterable;
+
+		/*
+		 * Quick return if we already know that the relation is not to be decoded.
+		 * These are for special relations that are unlogged and for sequences
+		 * and catalogs.
+		 */
+		if (entry->filterable)
+			return true;
+
+		/* Allow the output plugin to filter relations */
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  false, &cache_valid);
+
+		/*
+		 * If plugin had the relation ready in cache, the response is valid, else
+		 * we'll need to call the plugin a second time within a transaction.
+		 */
+		if (cache_valid)
+			return rb->can_filter_change;
 	}
 
 	/* constructs a temporary historical snapshot */
@@ -5606,18 +5625,30 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 
 	if (RelationIsValid(relation))
 	{
-		entry->relid = RelationGetRelid(relation);
+		if (IsToastRelation(relation))
+		{
+			Oid     real_reloid = InvalidOid;
+			char   *toast_name = RelationGetRelationName(relation);
+			/* pg_toast_ len is 9 */
+			char   *start_ch = &toast_name[9];
+
+			real_reloid = pg_strtoint32(start_ch);
+			entry->relid = real_reloid;
+		}
+		else
+			entry->relid = RelationGetRelid(relation);
+
 		entry->filterable = false;
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  true, &cache_valid);
 		RelationClose(relation);
 	}
 	else
 	{
 		entry->relid = InvalidOid;
-		entry->filterable = true;
+		rb->can_filter_change = entry->filterable = true;
 	}
 
-	rb->can_filter_change = entry->filterable;
-
 	ReorderBufferFreeSnap(rb, snapshot_now);
 
 	TeardownHistoricSnapshot(false);
@@ -5633,5 +5664,5 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (using_subtxn)
 		RollbackAndReleaseCurrentSubTransaction();
 
-	return entry->filterable;
+	return rb->can_filter_change;
 }
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f6..93eb579 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+								   ReorderBufferChangeType change_type, bool in_txn,
+								   bool *cache_valid);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -268,6 +271,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_change_cb = pgoutput_filter_change;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1744,6 +1748,72 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 }
 
 /*
+ * Determine whether a change to the specified relation should be published.
+ *
+ * See the comments atop of LogicalDecodeFilterChangeCB for details.
+ */
+static bool
+pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+					   ReorderBufferChangeType change_type, bool in_txn, bool *cache_valid)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *entry;
+
+	Assert(RelationSyncCache != NULL);
+
+	if (in_txn)
+	{
+		Relation	relation;
+
+		relation = RelationIdGetRelation(relid);
+		entry = get_rel_sync_entry(data, relation);
+		*cache_valid = true;
+	}
+	else
+	{
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
+												  &relid,
+												  HASH_FIND, cache_valid);
+		if (!*cache_valid)
+			return false;
+	}
+
+	/*
+	 * If the pubaction is not supported by this publication then return true to say
+	 * the change for this entry can be skipped.
+	 */
+	switch (change_type)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!entry->pubactions.pubinsert)
+			{
+				elog(DEBUG1, "Filtering INSERT");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!entry->pubactions.pubupdate)
+			{
+				elog(DEBUG1, "Filtering UPDATE");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!entry->pubactions.pubdelete)
+			{
+				elog(DEBUG1, "Filtering DELETE");
+				return true;
+			}
+			break;
+		default:
+			/* allow any other changes that are not explicitly filtered */
+			return false;
+	}
+
+	return false;
+}
+
+/*
  * Shutdown the output plugin.
  *
  * Note, we don't need to clean the data->context, data->cachectx, and
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71..f82bc1d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -97,6 +97,25 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
 											   RepOriginId origin_id);
 
 /*
+ * This callback is called before a change record is decoded to determine
+ * whether the change can be filtered out before entering the reorder buffer.
+ *
+ * Typically, the output plugin needs to refer to the system catalog to make
+ * this decision. To enhance efficiency, the plugin should cache the lookup
+ * result for each relation, minimizing catalog access on subsequent calls.
+ *
+ * If the callback is called with 'in_txn' set as false (the reorder
+ * buffer has not started a transaction), the output plugin should set '*cache_valid'
+ * to false to indicate that the result is not available in its internal cache.
+ * If 'in_txn' is true, the plugin can create a cache entry after querying the
+ * catalog.
+ */
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
@@ -222,6 +241,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterChangeCB filter_change_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 7d805d4..0b83ef4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -468,6 +468,12 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* filter change callback signature */
+typedef bool (*ReorderBufferFilterChangeCB) (ReorderBuffer *rb,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -578,6 +584,7 @@ struct ReorderBuffer
 	 */
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
+	ReorderBufferFilterChangeCB filter_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
@@ -751,6 +758,7 @@ extern void StartupReorderBuffer(void);
 
 extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 												XLogRecPtr lsn, RelFileLocator *rlocator,
+												ReorderBufferChangeType change_type,
 												bool has_tuple);
 extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
 
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 8726fe0..bced62c 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -485,9 +485,10 @@ $node_publisher->wait_for_catchup('tap_sub');
 
 # Check that we don't send BEGIN and COMMIT because of empty transaction
 # optimization.  We have to look for the DEBUG1 log messages about that, so
-# temporarily bump up the log verbosity.
+# temporarily bump up the log verbosity. Also confirm that unpublished
+# changes are filtered out after a restart.
 $node_publisher->append_conf('postgresql.conf', "log_min_messages = debug1");
-$node_publisher->reload;
+$node_publisher->restart;
 
 # Note that the current location of the log file is not grabbed immediately
 # after reloading the configuration, but after sending one SQL command to
@@ -495,6 +496,8 @@ $node_publisher->reload;
 $log_location = -s $node_publisher->logfile;
 
 $node_publisher->safe_psql('postgres', "INSERT INTO tab_notrep VALUES (11)");
+$node_publisher->safe_psql('postgres', "UPDATE tab_notrep SET a = 2 WHERE a = 1");
+$node_publisher->safe_psql('postgres', "DELETE FROM tab_notrep WHERE a = 2");
 
 $node_publisher->wait_for_catchup('tap_sub');
 
@@ -502,6 +505,17 @@ $logfile = slurp_file($node_publisher->logfile, $log_location);
 ok($logfile =~ qr/skipped replication of an empty transaction with XID/,
 	'empty transaction is skipped');
 
+# Check that an unpublished change is filtered out.
+$logfile = slurp_file($node_publisher->logfile, $log_location);
+ok($logfile =~ qr/Filtering INSERT/,
+	'unpublished INSERT is filtered');
+
+ok($logfile =~ qr/Filtering UPDATE/,
+	'unpublished UPDATE is filtered');
+
+ok($logfile =~ qr/Filtering DELETE/,
+	'unpublished DELETE is filtered');
+
 $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
 is($result, qq(0), 'check non-replicated table is empty on subscriber');
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 14a3bea..180eaa0 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -468,6 +468,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
+# Ensure that the subscription 'sub2' catches up with the latest changes. This
+# is necessary for the walsender to update its historic snapshot. Otherwise,
+# the walsender might retain an outdated snapshot, potentially preventing it
+# from accessing the newly created publication.
+$node_publisher->wait_for_catchup('sub2');
+
 # Since we specified publish_via_partition_root in pub_all and
 # pub_lower_level, all partition tables use their root tables' identity and
 # schema. We set the list of publications so that the FOR ALL TABLES
-- 
1.8.3.1

