From b4e23aec71a2a2193f5d94fa9b9c2285e1f1d25a Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 3 Mar 2025 04:55:13 -0500
Subject: [PATCH v15 2/3] Introduce a output plugin callback to filter changes

This new output plugin callback provides an option to logical decoding plugins to filter out
changes early. The primary purpose of the callback is to conserve memory and processing cycles
by excluding changes that are not required by output plugins.
---
 doc/src/sgml/logicaldecoding.sgml               | 41 ++++++++++++++-
 src/backend/replication/logical/decode.c        | 66 +++++++++++++++++++++--
 src/backend/replication/logical/logical.c       | 42 +++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 49 +++++++++++++----
 src/backend/replication/pgoutput/pgoutput.c     | 70 +++++++++++++++++++++++++
 src/include/replication/output_plugin.h         | 20 +++++++
 src/include/replication/reorderbuffer.h         | 10 +++-
 src/test/subscription/t/013_partition.pl        |  7 +++
 8 files changed, 289 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 1c4ae38..a603c47 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -560,6 +560,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeCommitCB commit_cb;
     LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+    LogicalDecodeFilterChangeCB filter_change_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeFilterPrepareCB filter_prepare_cb;
     LogicalDecodeBeginPrepareCB begin_prepare_cb;
@@ -582,8 +583,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
      and <function>commit_cb</function> callbacks are required,
      while <function>startup_cb</function>, <function>truncate_cb</function>,
      <function>message_cb</function>, <function>filter_by_origin_cb</function>,
-     and <function>shutdown_cb</function> are optional.
-     If <function>truncate_cb</function> is not set but a
+     <function>shutdown_cb</function>, and <function>filter_change_cb</function>
+     are optional. If <function>truncate_cb</function> is not set but a
      <command>TRUNCATE</command> is to be decoded, the action will be ignored.
     </para>
 
@@ -871,6 +872,42 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-change">
+     <title>Change Filter Callback</title>
+
+     <para>
+      The optional <function>filter_change_cb</function> is called before a
+      change record is decoded to determine whether the change can be filtered
+      out.
+<programlisting>
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+                                             Oid relid,
+                                             ReorderBufferChangeType change_type,
+                                             bool in_txn, bool *cache_valid);
+</programlisting>
+      To indicate that decoding can be skipped for the given change
+      <parameter>change_type</parameter>, return <literal>true</literal>;
+      <literal>false</literal> otherwise.
+      The <parameter>in_txn</parameter> parameter indicates whether the
+      callback is invoked within a transaction block.
+      When <parameter>in_txn</parameter> is false, and if making a decision to filter a change requires being inside a
+      transaction block, such as needing access to the catalog, set
+      <parameter>*cache_valid</parameter> to <literal>false</literal>.
+      This ensures that the callback will be reinvoked once a transaction block
+      starts. If a decision can be made immediately, set
+      <parameter>*cache_valid</parameter> to <literal>true</literal>.
+     </para>
+     <para>
+      The primary purpose of this callback function is to optimize memory usage
+      and processing efficiency by filtering out changes that are unnecessary for
+      output plugins. It enables output plugins to selectively process relevant
+      changes. Caching filtering decisions locally is recommended, as it enables
+      the callback to provide cached results without repeatedly initiating
+      transactions or querying the catalog. This approach minimizes overhead
+      and improves efficiency during the decoding process.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index d8e0114..fc6219f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -576,6 +576,17 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 }
 
 /*
+ * Check if filtering changes before decoding is supported and we're not suppressing filter
+ * changes currently.
+ */
+static inline bool
+FilterChangeIsEnabled(LogicalDecodingContext *ctx)
+{
+	return (ctx->callbacks.filter_change_cb != NULL &&
+				ctx->reorder->can_filter_change);
+}
+
+/*
  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
  */
 void
@@ -881,6 +892,15 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	UpdateDecodingStats(ctx);
 }
 
+/* Function to determine whether to skip the change */
+static bool SkipThisChange(LogicalDecodingContext *ctx, XLogRecPtr origptr, TransactionId xid, 
+                           RelFileLocator *target_locator, ReorderBufferChangeType change_type)
+{
+	return (FilterChangeIsEnabled(ctx) &&
+			ReorderBufferFilterByRelFileLocator(ctx->reorder, xid, origptr, target_locator,
+												change_type));
+}
+
 /*
  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
  *
@@ -915,9 +935,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
-	if (ctx->reorder->can_filter_change &&
-		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
-											buf->origptr, &target_locator))
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (SkipThisChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator,
+						REORDER_BUFFER_CHANGE_INSERT))	
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -970,6 +994,15 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+	if (SkipThisChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator, 
+						REORDER_BUFFER_CHANGE_UPDATE))
+        return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_UPDATE;
 	change->origin_id = XLogRecGetOrigin(r);
@@ -1036,6 +1069,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+    if (SkipThisChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator, 
+                        REORDER_BUFFER_CHANGE_DELETE))
+        return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 
 	if (xlrec->flags & XLH_DELETE_IS_SUPER)
@@ -1139,6 +1181,15 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+    if (SkipThisChange(ctx, buf->origptr, XLogRecGetXid(r), &rlocator, 
+                        REORDER_BUFFER_CHANGE_INSERT))
+        return;
+
+	/*
 	 * We know that this multi_insert isn't for a catalog, so the block should
 	 * always have data even if a full-page write of it is taken.
 	 */
@@ -1229,6 +1280,15 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	/*
+	 * When filtering changes, determine if the relation associated with the change
+	 * can be skipped. This could be because the relation is unlogged or because
+	 * the plugin has opted to exclude this relation from decoding.
+	 */
+    if (SkipThisChange(ctx, buf->origptr, XLogRecGetXid(r), &target_locator, 
+                        REORDER_BUFFER_CHANGE_INSERT))
+        return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
 	change->origin_id = XLogRecGetOrigin(r);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8ea846b..635e1de 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -74,6 +74,9 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							   XLogRecPtr message_lsn, bool transactional,
 							   const char *prefix, Size message_size, const char *message);
+static bool filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+								  ReorderBufferChangeType change_type, bool in_txn,
+								  bool *cache_valid);
 
 /* streaming callbacks */
 static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -220,6 +223,7 @@ StartupDecodingContext(List *output_plugin_options,
 	/* wrap output plugin callbacks, so we can add error context information */
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
+	ctx->reorder->filter_change = filter_change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
@@ -1224,6 +1228,44 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static bool
+filter_change_cb_wrapper(ReorderBuffer *cache, Oid relid,
+					  ReorderBufferChangeType change_type, bool in_txn,
+					  bool *cache_valid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* check if the filter change callback is supported */
+	if (ctx->callbacks.filter_change_cb == NULL)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_change";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_change_cb(ctx, relid, change_type, in_txn, cache_valid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5c30cf0..7a7d775 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -371,7 +371,8 @@ static void ReorderBufferMemoryResetcallback(void *arg);
 
 /*
  * After encountering a change that cannot be filtered out, filtering is
- * temporarily suspended. Filtering resumes after processing every 100 changes.
+ * temporarily suspended. Filtering resumes after processing CHANGES_THRESHOLD_FOR_FILTER
+ * changes.
  * This strategy helps to minimize the overhead of performing a hash table
  * search for each record, especially when most changes are not filterable.
  */
@@ -5703,8 +5704,8 @@ ReorderBufferMemoryResetcallback(void *arg)
  */
 bool
 ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
-									XLogRecPtr lsn, RelFileLocator *rlocator)
-
+									XLogRecPtr lsn, RelFileLocator *rlocator,
+									ReorderBufferChangeType change_type)
 {
 	bool		found;
 	Relation	relation;
@@ -5720,7 +5721,6 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 	Assert(txn);
 	toptxn = rbtxn_get_toptxn(txn);
-	rb->can_filter_change = false;
 
 	/*
 	 * We cannot construct an accurate historical snapshot until all pending
@@ -5737,7 +5737,25 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (found)
 	{
 		rb->can_filter_change = entry->filterable;
-		return entry->filterable;
+
+		/*
+		 * Quick return if we already know that the relation is not to be decoded.
+		 * These are for special relations that are unlogged and for sequences
+		 * and catalogs.
+		 */
+		if (entry->filterable)
+			return true;
+
+		/* Allow the output plugin to filter relations */
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  false, &cache_valid);
+
+		/*
+		 * If plugin had the relation ready in cache, the response is valid, else
+		 * we'll need to call the plugin a second time within a transaction.
+		 */
+		if (cache_valid)
+			return rb->can_filter_change;
 	}
 
 	/* constructs a temporary historical snapshot */
@@ -5761,18 +5779,29 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 
 	if (RelationIsValid(relation))
 	{
-		entry->relid = RelationGetRelid(relation);
+		if (IsToastRelation(relation))
+		{
+			char   *toast_name = RelationGetRelationName(relation);
+			int		n PG_USED_FOR_ASSERTS_ONLY;
+
+			n = sscanf(toast_name, "pg_toast_%u", &entry->relid);
+
+			Assert(n == 1);
+		}
+		else
+			entry->relid = RelationGetRelid(relation);
+
 		entry->filterable = false;
+		rb->can_filter_change = rb->filter_change(rb, entry->relid, change_type,
+												  true, &cache_valid);
 		RelationClose(relation);
 	}
 	else
 	{
 		entry->relid = InvalidOid;
-		entry->filterable = true;
+		rb->can_filter_change = entry->filterable = true;
 	}
 
-	rb->can_filter_change = entry->filterable;
-
 	ReorderBufferFreeSnap(rb, snapshot_now);
 
 	TeardownHistoricSnapshot(false);
@@ -5788,5 +5817,5 @@ ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
 	if (using_subtxn)
 		RollbackAndReleaseCurrentSubTransaction();
 
-	return entry->filterable;
+	return rb->can_filter_change;
 }
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7d464f6..93eb579 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+								   ReorderBufferChangeType change_type, bool in_txn,
+								   bool *cache_valid);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -268,6 +271,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_change_cb = pgoutput_filter_change;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1744,6 +1748,72 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 }
 
 /*
+ * Determine whether a change to the specified relation should be published.
+ *
+ * See the comments atop of LogicalDecodeFilterChangeCB for details.
+ */
+static bool
+pgoutput_filter_change(LogicalDecodingContext *ctx, Oid relid,
+					   ReorderBufferChangeType change_type, bool in_txn, bool *cache_valid)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *entry;
+
+	Assert(RelationSyncCache != NULL);
+
+	if (in_txn)
+	{
+		Relation	relation;
+
+		relation = RelationIdGetRelation(relid);
+		entry = get_rel_sync_entry(data, relation);
+		*cache_valid = true;
+	}
+	else
+	{
+		entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
+												  &relid,
+												  HASH_FIND, cache_valid);
+		if (!*cache_valid)
+			return false;
+	}
+
+	/*
+	 * If the pubaction is not supported by this publication then return true to say
+	 * the change for this entry can be skipped.
+	 */
+	switch (change_type)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!entry->pubactions.pubinsert)
+			{
+				elog(DEBUG1, "Filtering INSERT");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!entry->pubactions.pubupdate)
+			{
+				elog(DEBUG1, "Filtering UPDATE");
+				return true;
+			}
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!entry->pubactions.pubdelete)
+			{
+				elog(DEBUG1, "Filtering DELETE");
+				return true;
+			}
+			break;
+		default:
+			/* allow any other changes that are not explicitly filtered */
+			return false;
+	}
+
+	return false;
+}
+
+/*
  * Shutdown the output plugin.
  *
  * Note, we don't need to clean the data->context, data->cachectx, and
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 8d4d5b71..f82bc1d 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -97,6 +97,25 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
 											   RepOriginId origin_id);
 
 /*
+ * This callback is called before a change record is decoded to determine
+ * whether the change can be filtered out before entering the reorder buffer.
+ *
+ * Typically, the output plugin needs to refer to the system catalog to make
+ * this decision. To enhance efficiency, the plugin should cache the lookup
+ * result for each relation, minimizing catalog access on subsequent calls.
+ *
+ * If the callback is called with 'in_txn' set as false (the reorder
+ * buffer has not started a transaction), the output plugin should set '*cache_valid'
+ * to false to indicate that the result is not available in its internal cache.
+ * If 'in_txn' is true, the plugin can create a cache entry after querying the
+ * catalog.
+ */
+typedef bool (*LogicalDecodeFilterChangeCB) (struct LogicalDecodingContext *ctx,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
+/*
  * Called to shutdown an output plugin.
  */
 typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
@@ -222,6 +241,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterChangeCB filter_change_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 24166c2..d45b261 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -494,6 +494,12 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										const char *prefix, Size sz,
 										const char *message);
 
+/* filter change callback signature */
+typedef bool (*ReorderBufferFilterChangeCB) (ReorderBuffer *rb,
+											 Oid relid,
+											 ReorderBufferChangeType change_type,
+											 bool in_txn, bool *cache_valid);
+
 /* begin prepare callback signature */
 typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
 											 ReorderBufferTXN *txn);
@@ -604,6 +610,7 @@ struct ReorderBuffer
 	 */
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
+	ReorderBufferFilterChangeCB filter_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
 	ReorderBufferMessageCB message;
@@ -776,7 +783,8 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 extern void StartupReorderBuffer(void);
 
 extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
-												XLogRecPtr lsn, RelFileLocator *rlocator);
+												XLogRecPtr lsn, RelFileLocator *rlocator,
+												ReorderBufferChangeType change_type);
 extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
 
 #endif
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index 61b0cb4..a64b27e 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -472,6 +472,13 @@ $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4 (a int PRIMARY KEY)");
 $node_subscriber2->safe_psql('postgres',
 	"CREATE TABLE tab4_1 (a int PRIMARY KEY)");
+
+# Ensure that the subscription 'sub2' catches up with the latest changes. This
+# is necessary for the walsender to update its historic snapshot. Otherwise,
+# the walsender might retain an outdated snapshot, potentially preventing it
+# from accessing the newly created publication.
+$node_publisher->wait_for_catchup('sub2');
+
 # Since we specified publish_via_partition_root in pub_all and
 # pub_lower_level, all partition tables use their root tables' identity and
 # schema. We set the list of publications so that the FOR ALL TABLES
-- 
1.8.3.1

