Hi all,

Commit ac4645c015 allows pgoutput to send logical decoding messages,
but it's limited to applications that use the pgoutput plugin -- the
built-in logical replication doesn't use it. I'd like to propose
introducing a hook to the logical replication message handling so that
extensions can plug in their own handling routine. This feature can be
used for extensions to implement DDL replication, function
replication, or trigger user-specific routines on the subscriber side.

I've attached the PoC patch; it adds a hook function, and adds a new
'message' subscription option that allows the user to request the
publisher to send logical decoding messages. Therefore, users need to
enable the 'message' option and set up the hook function at server
startup in order to receive the messages and trigger the hook
function.

I I went with a hook function in the patch. While it lets you chain
the multiple hook functions, providing the registration API might be
better, or other types of registry can also be considered.

Feedback is very welcome.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
From 546809304986e5cdcdee4ee41387c5edeb435fb2 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <[email protected]>
Date: Mon, 8 Jun 2026 11:25:47 -0700
Subject: [PATCH v1] Add a hook for handling logical messages on subscribers.

Add a "message" subscription option (default off) that asks the
publisher's output plugin to stream messages, and dispatch each
received message to a new LogicalMessageHandle_hook.  When the option
is off or no hook is installed, behavior is unchanged.	This lets an
extension act on logical messages on the subscriber -- for instance to
carry deparsed DDL to a custom apply worker -- without modifying core
apply code.

Message handling honors transaction streaming and ALTER SUBSCRIPTION
... SKIP, as exercised by the new test_logicalmsg_hooks module.

XXX Bump catalog version for the new pg_subscription.submessage column.
---
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/subscriptioncmds.c       |  24 ++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/proto.c       |  25 +++
 src/backend/replication/logical/worker.c      |  42 +++-
 src/bin/pg_dump/pg_dump.c                     |  16 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   5 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/replication/logicalproto.h        |  13 ++
 src/include/replication/walreceiver.h         |   1 +
 src/include/replication/worker_internal.h     |   4 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 .../modules/test_logicalmsg_hooks/.gitignore  |   4 +
 .../modules/test_logicalmsg_hooks/Makefile    |  20 ++
 .../modules/test_logicalmsg_hooks/meson.build |  28 +++
 .../test_logicalmsg_hooks/t/001_basic.pl      | 113 ++++++++++
 .../test_logicalmsg_hooks.c                   |  40 ++++
 src/test/regress/expected/subscription.out    | 202 ++++++++++--------
 src/test/regress/sql/subscription.sql         |  17 ++
 src/tools/pgindent/typedefs.list              |   1 +
 23 files changed, 469 insertions(+), 101 deletions(-)
 create mode 100644 src/test/modules/test_logicalmsg_hooks/.gitignore
 create mode 100644 src/test/modules/test_logicalmsg_hooks/Makefile
 create mode 100644 src/test/modules/test_logicalmsg_hooks/meson.build
 create mode 100644 src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
 create mode 100644 src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index b5cb301db88..f14649a245b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -132,6 +132,7 @@ GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
 	sub->retaindeadtuples = subform->subretaindeadtuples;
 	sub->maxretention = subform->submaxretention;
 	sub->retentionactive = subform->subretentionactive;
+	sub->message = subform->submessage;
 
 	if (conninfo_needed)
 	{
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8f129baec90..6ebaa93b1ff 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1528,7 +1528,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
 			  subpasswordrequired, subrunasowner, subfailover,
               subretaindeadtuples, submaxretention, subretentionactive,
               subserver, subslotname, subsynccommit, subwalrcvtimeout,
-              subpublications, suborigin)
+              subpublications, suborigin, submessage)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ee06a726f42..5183ab01156 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -79,6 +79,7 @@
 #define SUBOPT_WAL_RECEIVER_TIMEOUT			0x00010000
 #define SUBOPT_LSN					0x00020000
 #define SUBOPT_ORIGIN				0x00040000
+#define SUBOPT_MESSAGE				0x00080000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -105,6 +106,7 @@ typedef struct SubOpts
 	bool		runasowner;
 	bool		failover;
 	bool		retaindeadtuples;
+	bool		message;
 	int32		maxretention;
 	char	   *origin;
 	XLogRecPtr	lsn;
@@ -192,6 +194,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->failover = false;
 	if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
 		opts->retaindeadtuples = false;
+	if (IsSet(supported_opts, SUBOPT_MESSAGE))
+		opts->message = false;
 	if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
@@ -348,6 +352,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
 			opts->retaindeadtuples = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_MESSAGE) &&
+				 strcmp(defel->defname, "message") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MESSAGE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MESSAGE;
+			opts->message = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
 				 strcmp(defel->defname, "max_retention_duration") == 0)
 		{
@@ -674,7 +687,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
 					  SUBOPT_MAX_RETENTION_DURATION |
-					  SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
+					  SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN | SUBOPT_MESSAGE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -824,6 +837,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
 	values[Anum_pg_subscription_subretaindeadtuples - 1] =
 		BoolGetDatum(opts.retaindeadtuples);
+	values[Anum_pg_subscription_submessage - 1] = BoolGetDatum(opts.message);
 	values[Anum_pg_subscription_submaxretention - 1] =
 		Int32GetDatum(opts.maxretention);
 	values[Anum_pg_subscription_subretentionactive - 1] =
@@ -1499,7 +1513,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							  SUBOPT_RETAIN_DEAD_TUPLES |
 							  SUBOPT_MAX_RETENTION_DURATION |
 							  SUBOPT_WAL_RECEIVER_TIMEOUT |
-							  SUBOPT_ORIGIN);
+							  SUBOPT_ORIGIN | SUBOPT_MESSAGE);
 			break;
 
 		case ALTER_SUBSCRIPTION_ENABLED:
@@ -1858,6 +1872,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MESSAGE))
+				{
+					values[Anum_pg_subscription_submessage - 1] = BoolGetDatum(opts.message);
+					replaces[Anum_pg_subscription_submessage - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5376519fea5..490e0590970 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -614,6 +614,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendQuotedLiteral(&cmd, options->proto.logical.origin);
 		}
 
+		if (options->proto.logical.messages &&
+			PQserverVersion(conn->streamConn) >= 140000)
+			appendStringInfo(&cmd, ", messages 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(pubnames);
 		appendStringInfoString(&cmd, ", publication_names ");
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 86ad97cd937..af8ee26ed0c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -660,6 +660,31 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Read MESSAGE from stream.
+ */
+void
+logicalrep_read_message(StringInfo in, LogicalRepMessageData *msg_data)
+{
+	Size		len;
+	char	   *msg;
+
+	msg_data->flags = pq_getmsgint(in, 1);
+	msg_data->lsn = pq_getmsgint64(in);
+	msg_data->prefix = pstrdup(pq_getmsgstring(in));
+
+	/* read message length */
+	len = pq_getmsgint(in, 4);
+	msg_data->message_size = len;
+
+	/* and data */
+	msg = palloc(len + 1);
+	pq_copymsgbytes(in, msg, len);
+
+	msg[len] = '\0';
+	msg_data->message = msg;
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7799266c614..3457ed7b379 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -549,6 +549,9 @@ typedef struct ApplySubXactData
 
 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
 
+/* Hook for plugins to get control in handling logical decoding messages */
+LogicalRepMessageHandle_hook_type LogicalRepMessageHandle_hook = NULL;
+
 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
 
@@ -1694,6 +1697,34 @@ apply_handle_origin(StringInfo s)
 				 errmsg_internal("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle MESSAGE message.
+ *
+ * Invoke a hook function if set.
+ */
+static void
+apply_handle_message(StringInfo s)
+{
+	LogicalRepMessageData msg;
+
+	if (!LogicalRepMessageHandle_hook)
+		return;
+
+	/*
+	 * Logical messages are handled only the (parallel) apply workers
+	 */
+	if (am_tablesync_worker() || am_sequencesync_worker())
+		return;
+
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_MESSAGE, s))
+		return;
+
+	logicalrep_read_message(s, &msg);
+
+	(*LogicalRepMessageHandle_hook) (&msg);
+}
+
 /*
  * Initialize fileset (if not already done).
  *
@@ -3846,12 +3877,7 @@ apply_dispatch(StringInfo s)
 			break;
 
 		case LOGICAL_REP_MSG_MESSAGE:
-
-			/*
-			 * Logical replication does not use generic logical messages yet.
-			 * Although, it could be used by other applications that use this
-			 * output plugin.
-			 */
+			apply_handle_message(s);
 			break;
 
 		case LOGICAL_REP_MSG_STREAM_START:
@@ -5128,7 +5154,8 @@ maybe_reread_subscription(void)
 		newsub->passwordrequired != MySubscription->passwordrequired ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		newsub->message != MySubscription->message)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -5614,6 +5641,7 @@ set_stream_options(WalRcvStreamOptions *options,
 
 	options->proto.logical.twophase = false;
 	options->proto.logical.origin = pstrdup(MySubscription->origin);
+	options->proto.logical.messages = MySubscription->message;
 }
 
 /*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c56437d6057..4441b14082c 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5184,6 +5184,7 @@ getSubscriptions(Archive *fout)
 	int			i_subfailover;
 	int			i_subretaindeadtuples;
 	int			i_submaxretention;
+	int			i_submessage;
 	int			i,
 				ntups;
 
@@ -5282,9 +5283,14 @@ getSubscriptions(Archive *fout)
 							 " '-1' AS subwalrcvtimeout,\n");
 
 	if (fout->remoteVersion >= 190000)
-		appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n");
 	else
-		appendPQExpBufferStr(query, " NULL AS subservername\n");
+		appendPQExpBufferStr(query, " NULL AS subservername,\n");
+
+	if (fout->remoteVersion >= 190000)
+		appendPQExpBufferStr(query, " s.submessage\n");
+	else
+		appendPQExpBufferStr(query, "false AS submessage\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
@@ -5333,6 +5339,7 @@ getSubscriptions(Archive *fout)
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+	i_submessage = PQfnumber(res, "submessage");
 
 	subinfo = pg_malloc_array(SubscriptionInfo, ntups);
 
@@ -5390,6 +5397,8 @@ getSubscriptions(Archive *fout)
 		else
 			subinfo[i].suboriginremotelsn =
 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+		subinfo[i].submessage =
+			(strcmp(PQgetvalue(res, i, i_submessage), "t") == 0);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5642,6 +5651,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (subinfo->subretaindeadtuples)
 		appendPQExpBufferStr(query, ", retain_dead_tuples = true");
 
+	if (subinfo->submessage)
+		appendPQExpBufferStr(query, ", message = true");
+
 	if (subinfo->submaxretention)
 		appendPQExpBuffer(query, ", max_retention_duration = %d", subinfo->submaxretention);
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 5a6726d8b12..30bd1575a26 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -721,6 +721,7 @@ typedef struct _SubscriptionInfo
 	bool		subrunasowner;
 	bool		subfailover;
 	bool		subretaindeadtuples;
+	bool		submessage;
 	int			submaxretention;
 	char	   *subservername;
 	char	   *subconninfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index af3935b0078..ac5a2c1835b 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -7093,7 +7093,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -7179,6 +7179,9 @@ describeSubscriptions(const char *pattern, bool verbose)
 			appendPQExpBuffer(&buf,
 							  ", subretentionactive AS \"%s\"\n",
 							  gettext_noop("Retention active"));
+			appendPQExpBuffer(&buf,
+							  ", submessage AS \"%s\"\n",
+							  gettext_noop("Message"));
 		}
 
 		appendPQExpBuffer(&buf,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 48944201889..cdda8265573 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,6 +83,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
 
+	bool		submessage;		/* True if the subscription wants to receive
+								 * logical messages. */
+
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
 									 * conflict detection can be retained */
@@ -171,6 +174,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		message;		/* True if the subscription wants to receive
+								 * logical messages */
 } Subscription;
 
 #ifdef EXPOSE_TO_CLIENT_CODE
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..85add073202 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -191,6 +191,18 @@ typedef struct LogicalRepStreamAbortData
 	TimestampTz abort_time;
 } LogicalRepStreamAbortData;
 
+/*
+ * Logical decoding message information
+ */
+typedef struct LogicalRepMessageData
+{
+	uint8		flags;
+	XLogRecPtr	lsn;
+	const char *prefix;
+	Size		message_size;	/* length of message in bytes */
+	const char *message;		/* payload; may contain embedded nulls */
+} LogicalRepMessageData;
+
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
 extern void logicalrep_read_begin(StringInfo in,
 								  LogicalRepBeginData *begin_data);
@@ -248,6 +260,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_read_message(StringInfo in, LogicalRepMessageData *msg_data);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
 								 PublishGencolsType include_gencols_type);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 47c07574d4d..44a08cc2170 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -188,6 +188,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		messages;	/* Logical message */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 745b7d9e969..b07042846cb 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -18,6 +18,7 @@
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
+#include "replication/logicalproto.h"
 #include "storage/buffile.h"
 #include "storage/fileset.h"
 #include "storage/shm_mq.h"
@@ -255,6 +256,9 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+typedef void (*LogicalRepMessageHandle_hook_type) (LogicalRepMessageData *msg);
+extern PGDLLIMPORT LogicalRepMessageHandle_hook_type LogicalRepMessageHandle_hook;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 0a74ab5c86f..603fe1ff907 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -36,6 +36,7 @@ SUBDIRS = \
 		  test_integerset \
 		  test_json_parser \
 		  test_lfind \
+		  test_logicalmsg_hooks \
 		  test_lwlock_tranches \
 		  test_misc \
 		  test_oat_hooks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 4bca42bb370..5f6c0cae77a 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -37,6 +37,7 @@ subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
 subdir('test_lfind')
+subdir('test_logicalmsg_hooks')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
 subdir('test_oat_hooks')
diff --git a/src/test/modules/test_logicalmsg_hooks/.gitignore b/src/test/modules/test_logicalmsg_hooks/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_logicalmsg_hooks/Makefile b/src/test/modules/test_logicalmsg_hooks/Makefile
new file mode 100644
index 00000000000..af129009b8a
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/test_logicalmsg_hooks/Makefile
+
+MODULE_big = test_logicalmsg_hooks
+OBJS = \
+	$(WIN32RES) \
+	test_logicalmsg_hooks.o
+PGFILEDESC = "test_logicalmsg_hooks - test logical replication message hooks"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_logicalmsg_hooks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_logicalmsg_hooks/meson.build b/src/test/modules/test_logicalmsg_hooks/meson.build
new file mode 100644
index 00000000000..8084d82e47b
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/meson.build
@@ -0,0 +1,28 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+test_logicalmsg_hooks_sources = files(
+  'test_logicalmsg_hooks.c',
+)
+
+if host_system == 'windows'
+  test_logicalmsg_hooks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_logicalmsg_hooks',
+    '--FILEDESC', 'test_logicalmsg_hooks - test logical message hooks',])
+endif
+
+test_logicalmsg_hooks = shared_module('test_logicalmsg_hooks',
+  test_logicalmsg_hooks_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_logicalmsg_hooks
+
+tests += {
+  'name': 'test_logicalmsg_hooks',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl b/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
new file mode 100644
index 00000000000..64ca51d1786
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
@@ -0,0 +1,113 @@
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $pub = PostgreSQL::Test::Cluster->new('publisher');
+$pub->init(allows_streaming => 'logical');
+$pub->start;
+
+my $sub = PostgreSQL::Test::Cluster->new('subscriber');
+$sub->init(allows_streaming => 'logical');
+$sub->append_conf(
+	'postgresql.conf', q{
+shared_preload_libraries = 'test_logicalmsg_hooks'
+});
+$sub->start;
+
+$pub->safe_psql(
+	'postgres',
+	qq{
+CREATE TABLE test (a int);
+INSERT INTO test VALUES (1);
+CREATE PUBLICATION pub FOR ALL TABLES;
+});
+
+my $pub_connstr = $pub->connstr . ' dbname=postgres';
+
+$sub->safe_psql(
+	'postgres',
+	qq{
+CREATE TABLE test (a int primary key);
+CREATE SUBSCRIPTION sub CONNECTION '$pub_connstr' PUBLICATION pub WITH (message = true, disable_on_error = true)
+});
+
+$sub->wait_for_subscription_sync($pub, 'sub');
+
+my $log_location = -s $sub->logfile;
+
+# Emit a logical message and verify that the subscriber processed it in the
+# hook function.
+$pub->safe_psql(
+	'postgres',
+	qq{
+SELECT pg_logical_emit_message(true, 'test_prefix', 'test_mesasge');
+});
+$pub->wait_for_catchup('sub');
+$sub->wait_for_log(
+	qr/LOG[^\n]+received message: LSN [[:xdigit:]]+\/[[:xdigit:]]+, prefix: test_prefix, message: test_mesasge/,
+	$log_location);
+ok(1, "logical message is correctly handled on the subscriber");
+
+$log_location = -s $sub->logfile;
+
+# INSERT a tuple with a=1 conflicts on the subscriber. Test that ALTER SUBSCRIPTION ... SKIP
+# the transacitonal message as well.
+$pub->safe_psql(
+	'postgres',
+	q{
+BEGIN;
+INSERT INTO test VALUES (1);
+SELECT pg_logical_emit_message(true, 'conflict', 'should be skipped');
+COMMIT;
+});
+
+# Wait until a conflict occurs on the subscriber.
+$sub->poll_query_until('postgres',
+	q{SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'});
+
+# Get the finish LSN of the error transaction.
+my $contents = slurp_file($sub->logfile, $log_location);
+$contents =~
+  qr/conflict detected on relation "public.test".*\n.*DETAIL:.*Could not apply remote change.*\n.*Key already exists in unique index "test_pkey", modified in transaction \d+: key .*, local row .*\n.*CONTEXT:.* for replication target relation "public.test" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+  or die "could not get error-LSN";
+my $lsn = $1;
+
+$log_location = -s $sub->logfile;
+
+# Set skip LSN and re-enable the subscription.
+$sub->safe_psql('postgres', qq{ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn');});
+$sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the failed transaction to be skipped
+$sub->poll_query_until('postgres',
+	q{SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'});
+
+ok( !$sub->log_contains(
+		qr/LOG[^\n]+received message: LSN .*, prefix: conflict, message: should be skipped/
+	),
+	"logical message is skipped by ALTER SUBSCRIPTION SKIP");
+
+# Test that a non-transactional message is applied even if the transaction rolls back.
+$log_location = -s $sub->logfile;
+$pub->safe_psql(
+	'postgres',
+	q{
+BEGIN;
+SELECT pg_logical_emit_message(false, 'test_prefix', 'non-transactional message');
+ROLLBACK;
+});
+$pub->wait_for_catchup('sub');
+$sub->wait_for_log(
+	qr/LOG[^\n]+received message: LSN [[:xdigit:]]+\/[[:xdigit:]]+, prefix: test_prefix, message: non-transactional message/,
+	$log_location);
+ok(1, "logical message is correctly handled on the subscriber");
+
+
+$pub->stop;
+$sub->stop;
+done_testing();
diff --git a/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c b/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
new file mode 100644
index 00000000000..9167043508e
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
@@ -0,0 +1,40 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_logicalmsg_hooks.c
+ *		Code for testing LogicalMessageHandle_hook
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/worker_internal.h"
+
+PG_MODULE_MAGIC;
+
+static void test_logical_message_handler(LogicalRepMessageData *msg);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+	LogicalRepMessageHandle_hook = &test_logical_message_handler;
+}
+
+void
+test_logical_message_handler(LogicalRepMessageData *msg)
+{
+	ereport(LOG,
+			(errmsg("received message: LSN %X/%08X, prefix: %s, message: %s",
+					LSN_FORMAT_ARGS(msg->lsn),
+					msg->prefix,
+					msg->message)));
+}
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 8dbfac66326..29c2fac6504 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -139,18 +139,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+ regress_testsub4
-                                                                                                                                                                       List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                                                                       List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -237,10 +237,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
+                                                                                                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -249,10 +249,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -268,10 +268,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
 (1 row)
 
 -- ok - with lsn = NONE
@@ -280,10 +280,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 BEGIN;
@@ -319,10 +319,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
 ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
 ERROR:  invalid value for parameter "wal_receiver_timeout": "foobar"
 \dRs+
-                                                                                                                                                                                List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
+                                                                                                                                                                                     List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
 (1 row)
 
 -- rename back to keep the rest simple
@@ -351,19 +351,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -375,27 +375,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication already exists
@@ -410,10 +410,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                                                                               List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication used more than once
@@ -428,10 +428,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -467,19 +467,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -489,10 +489,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -505,18 +505,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -529,10 +529,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -549,10 +549,10 @@ NOTICE:  max_retention_duration is ineffective when retain_dead_tuples is disabl
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                   1000 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - max_retention_duration must be non-negative
@@ -561,12 +561,38 @@ ERROR:  max_retention_duration cannot be negative
 -- ok
 ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - message must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = foo);
+ERROR:  message requires a Boolean value
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = true);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+\dRs+
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | t       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+(1 row)
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (message = false);
+\dRs+
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
+-- cleanup
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 -- let's do some tests with pg_create_subscription rather than superuser
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 05533d66675..7eef3721a9a 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -423,6 +423,23 @@ ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - message must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = foo);
+
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = true);
+
+\dRs+
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (message = false);
+
+\dRs+
+
+-- cleanup
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 -- let's do some tests with pg_create_subscription rather than superuser
 SET SESSION AUTHORIZATION regress_subscription_user3;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1969d467c1d..09c9b87d483 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1701,6 +1701,7 @@ LogicalRepBeginData
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
+LogicalRepMessageData
 LogicalRepMsgType
 LogicalRepPartMapEntry
 LogicalRepPreparedTxnData
-- 
2.54.0

Reply via email to