On Tue, Mar 1, 2022 at 4:12 PM kuroda.hay...@fujitsu.com
<kuroda.hay...@fujitsu.com> wrote:
>
> Hi Vignesh,
>
> > In logical replication, currently Walsender sends the data that is
> > generated locally and the data that are replicated from other
> > instances. This results in infinite recursion in circular logical
> > replication setup.
>
> Thank you for good explanation. I understand that this fix can be used
> for a bidirectional replication.
>
> > Here there are two problems for the user: a) incremental
> > synchronization of table sending both local data and replicated data
> > by walsender b) Table synchronization of table using copy command
> > sending both local data and replicated data
>
> So you wanted to solve these two problem and currently focused on
> the first one, right? We can check one by one.
>
> > For the first problem "Incremental synchronization of table by
> > Walsender" can be solved by:
> > Currently the locally generated data does not have replication origin
> > associated and the data that has originated from another instance will
> > have a replication origin associated. We could use this information to
> > differentiate locally generated data and replicated data and send only
> > the locally generated data. This "only_local" could be provided as an
> > option while subscription is created:
> > ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=5433'
> > PUBLICATION pub1 with (only_local = on);
>
> Sounds good, but I cannot distinguish whether the assumption will keep.
>
> I played with your patch, but it could not be applied to current master.
> I tested from bd74c40 and I confirmed infinite loop was not appeared.
Rebased the patch on top of head

> local_only could not be set from ALTER SUBSCRIPTION command.
> Is it expected?
Modified

Thanks for the comments, the attached patch has the changes for the same.

Regards,
Vignesh
From 7c67cc23584e1106fbf2011c8c6658442125e48f Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Wed, 2 Mar 2022 20:40:34 +0530
Subject: [PATCH v2] Skip replication of non local data.

Add an option only_local which will subscribe only to the locally
generated data in the publisher node. If subscriber is created with this
option, publisher will skip publishing the data that was subscribed
from other nodes. It can be created using following syntax:
ex: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname =postgres port=9999' PUBLICATION pub1 with (only_local = on);
---
 contrib/test_decoding/test_decoding.c         |  13 +++
 doc/src/sgml/ref/alter_subscription.sgml      |   3 +-
 doc/src/sgml/ref/create_subscription.sgml     |  12 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   3 +-
 src/backend/commands/subscriptioncmds.c       |  29 ++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  18 ++-
 src/backend/replication/logical/decode.c      |  36 ++++--
 src/backend/replication/logical/logical.c     |  35 ++++++
 src/backend/replication/logical/tablesync.c   |   2 +-
 src/backend/replication/logical/worker.c      |   2 +
 src/backend/replication/pgoutput/pgoutput.c   |  25 ++++
 src/backend/replication/slot.c                |   4 +-
 src/backend/replication/slotfuncs.c           |  18 ++-
 src/backend/replication/walreceiver.c         |   2 +-
 src/backend/replication/walsender.c           |  21 +++-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_proc.dat               |   6 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logical.h             |   4 +
 src/include/replication/output_plugin.h       |   7 ++
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/slot.h                |   5 +-
 src/include/replication/walreceiver.h         |   8 +-
 src/test/regress/expected/rules.out           |   5 +-
 src/test/regress/expected/subscription.out    |   4 +
 src/test/regress/sql/subscription.sql         |   4 +
 src/test/subscription/t/029_circular.pl       | 108 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 29 files changed, 345 insertions(+), 39 deletions(-)
 create mode 100644 src/test/subscription/t/029_circular.pl

diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index ea22649e41..58bc5dbc1c 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -73,6 +73,8 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx,
 							   ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 							 RepOriginId origin_id);
+static bool pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+										RepOriginId origin_id);
 static void pg_decode_message(LogicalDecodingContext *ctx,
 							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 							  bool transactional, const char *prefix,
@@ -148,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
+	cb->filter_remotedata_cb = pg_decode_filter_remotedata;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
 	cb->sequence_cb = pg_decode_sequence;
@@ -484,6 +487,16 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static bool
+pg_decode_filter_remotedata(LogicalDecodingContext *ctx,
+							  RepOriginId origin_id)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->only_local && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0d6f064f58..bd2ef19cce 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -204,7 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, and
+      <literal>binary</literal>,
+      <literal>only_local</literal>, and
       <literal>streaming</literal>.
      </para>
     </listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index e80a2617a3..b5552dc58c 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -152,6 +152,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>only_local</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription should subscribe only to the
+          locally generated changes or subscribe to both the locally generated
+          changes and the replicated data present from the publisher.  The
+          default is <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>slot_name</literal> (<type>string</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index ca65a8bd20..94e096e5fb 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->binary = subform->subbinary;
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
+	sub->onlylocaldata = subform->subonlylocaldata;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 40b7bca5a9..c7653298c5 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -958,7 +958,8 @@ CREATE VIEW pg_replication_slots AS
             L.confirmed_flush_lsn,
             L.wal_status,
             L.safe_wal_size,
-            L.two_phase
+            L.two_phase,
+            L.only_local
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3ef6607d24..5be0211ac3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -61,6 +61,7 @@
 #define SUBOPT_BINARY				0x00000080
 #define SUBOPT_STREAMING			0x00000100
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
+#define SUBOPT_ONLYLOCAL_DATA 		0x00000400
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -82,6 +83,7 @@ typedef struct SubOpts
 	bool		binary;
 	bool		streaming;
 	bool		twophase;
+	bool		onlylocal_data;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->streaming = false;
 	if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
 		opts->twophase = false;
+	if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA))
+		opts->onlylocal_data = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -228,6 +232,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_ONLYLOCAL_DATA) &&
+				 strcmp(defel->defname, "only_local") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_ONLYLOCAL_DATA))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_ONLYLOCAL_DATA;
+			opts->onlylocal_data = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
+					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+					  SUBOPT_ONLYLOCAL_DATA);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -460,6 +474,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+	values[Anum_pg_subscription_subonlylocaldata - 1] = BoolGetDatum(opts.onlylocal_data);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -565,7 +580,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					twophase_enabled = true;
 
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
-								   CRS_NOEXPORT_SNAPSHOT, NULL);
+								   CRS_NOEXPORT_SNAPSHOT, NULL,
+								   opts.onlylocal_data);
 
 				if (twophase_enabled)
 					UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
@@ -864,7 +880,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING);
+								  SUBOPT_STREAMING | SUBOPT_ONLYLOCAL_DATA);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -913,6 +929,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_substream - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_ONLYLOCAL_DATA))
+				{
+					values[Anum_pg_subscription_subonlylocaldata - 1] =
+						BoolGetDatum(opts.streaming);
+					replaces[Anum_pg_subscription_subonlylocaldata - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..326f60414e 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -75,7 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  bool temporary,
 								  bool two_phase,
 								  CRSSnapshotAction snapshot_action,
-								  XLogRecPtr *lsn);
+								  XLogRecPtr *lsn,
+								  bool onlylocal_data);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -453,6 +454,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		if (options->proto.logical.onlylocal_data &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfoString(&cmd, ", only_local 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
@@ -869,7 +874,7 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
 static char *
 libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 					 bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
-					 XLogRecPtr *lsn)
+					 XLogRecPtr *lsn, bool onlylocal_data)
 {
 	PGresult   *res;
 	StringInfoData cmd;
@@ -899,6 +904,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 				appendStringInfoChar(&cmd, ' ');
 		}
 
+		if (onlylocal_data)
+		{
+			appendStringInfoString(&cmd, "ONLY_LOCAL");
+			if (use_new_options_syntax)
+				appendStringInfoString(&cmd, ", ");
+			else
+				appendStringInfoChar(&cmd, ' ');
+		}
+
 		if (use_new_options_syntax)
 		{
 			switch (snapshot_action)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 18cf931822..6305b93fc7 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -555,6 +555,15 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return filter_by_origin_cb_wrapper(ctx, origin_id);
 }
 
+static inline bool
+FilterRemoteOriginData(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+	if (ctx->callbacks.filter_remotedata_cb == NULL)
+		return false;
+
+	return filter_remotedata_cb_wrapper(ctx, origin_id);
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
@@ -585,7 +594,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	message = (xl_logical_message *) XLogRecGetData(r);
 
 	if (message->dbId != ctx->slot->data.database ||
-		FilterByOrigin(ctx, origin_id))
+		FilterByOrigin(ctx, origin_id) ||
+		FilterRemoteOriginData(ctx, origin_id))
 		return;
 
 	if (message->transactional &&
@@ -864,7 +874,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -914,7 +925,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -980,7 +992,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1032,7 +1045,8 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1082,7 +1096,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	/*
@@ -1175,7 +1190,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	change = ReorderBufferGetChange(ctx->reorder);
@@ -1250,7 +1266,8 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
 	return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
 			(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
-			ctx->fast_forward || FilterByOrigin(ctx, origin_id));
+			ctx->fast_forward || FilterByOrigin(ctx, origin_id) ||
+			FilterRemoteOriginData(ctx, origin_id));
 }
 
 /*
@@ -1335,7 +1352,8 @@ sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		return;
 
 	/* output plugin doesn't look for this origin, no need to queue */
-	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)) ||
+		FilterRemoteOriginData(ctx, XLogRecGetOrigin(r)))
 		return;
 
 	tupledata = XLogRecGetData(r);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 934aa13f2d..19584eaea7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -246,6 +246,8 @@ StartupDecodingContext(List *output_plugin_options,
 		(ctx->callbacks.stream_sequence_cb != NULL) ||
 		(ctx->callbacks.stream_truncate_cb != NULL);
 
+	ctx->onlylocal_data = ctx->callbacks.filter_remotedata_cb != NULL;
+
 	/*
 	 * streaming callbacks
 	 *
@@ -451,6 +453,8 @@ CreateInitDecodingContext(const char *plugin,
 	 */
 	ctx->twophase &= slot->data.two_phase;
 
+	ctx->onlylocal_data &= slot->data.onlylocal_data;
+
 	ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
 
 	return ctx;
@@ -1178,6 +1182,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+bool
+filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+							   RepOriginId origin_id)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_remoteorigin";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_remotedata_cb(ctx, origin_id);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 1659964571..f5093ce8c9 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1224,7 +1224,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	HOLD_INTERRUPTS();
 	walrcv_create_slot(LogRepWorkerWalRcvConn,
 					   slotname, false /* permanent */ , false /* two_phase */ ,
-					   CRS_USE_SNAPSHOT, origin_startpos);
+					   CRS_USE_SNAPSHOT, origin_startpos, false /* only_local */);
 	RESUME_INTERRUPTS();
 
 	/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f7960..a13b8007e7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2960,6 +2960,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->owner != MySubscription->owner ||
+		newsub->onlylocaldata != MySubscription->onlylocaldata ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
@@ -3569,6 +3570,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.onlylocal_data = MySubscription->onlylocaldata;
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..0c9b60bd65 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -55,6 +55,8 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+										 RepOriginId origin_id);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -215,6 +217,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_remotedata_cb = pgoutput_remoteorigin_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -239,11 +242,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		onlylocal_data_given = false;
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->onlylocal_data = false;
 
 	foreach(lc, options)
 	{
@@ -332,6 +337,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "only_local") == 0)
+		{
+			if (onlylocal_data_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			onlylocal_data_given = true;
+
+			data->onlylocal_data = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -1450,6 +1465,16 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static bool
+pgoutput_remoteorigin_filter(LogicalDecodingContext *ctx,
+							 RepOriginId origin_id)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+	if (data->onlylocal_data && origin_id != InvalidRepOriginId)
+		return true;
+	return false;
+}
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index caa6b29756..fed01829f3 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -253,7 +253,8 @@ ReplicationSlotValidateName(const char *name, int elevel)
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency persistency, bool two_phase)
+					  ReplicationSlotPersistency persistency, bool two_phase,
+					  bool onlylocal_data)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -313,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	slot->data.persistency = persistency;
 	slot->data.two_phase = two_phase;
 	slot->data.two_phase_at = InvalidXLogRecPtr;
+	slot->data.onlylocal_data = onlylocal_data;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 886899afd2..0e0bc1e940 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
-						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT,
+						  false, false);
 
 	if (immediately_reserve)
 	{
@@ -118,7 +119,8 @@ static void
 create_logical_replication_slot(char *name, char *plugin,
 								bool temporary, bool two_phase,
 								XLogRecPtr restart_lsn,
-								bool find_startpoint)
+								bool find_startpoint,
+								bool onlylocal_data)
 {
 	LogicalDecodingContext *ctx = NULL;
 
@@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin,
 	 * error as well.
 	 */
 	ReplicationSlotCreate(name, true,
-						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
+						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
+						  onlylocal_data);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	Name		plugin = PG_GETARG_NAME(1);
 	bool		temporary = PG_GETARG_BOOL(2);
 	bool		two_phase = PG_GETARG_BOOL(3);
+	bool		onlylocal_data = PG_GETARG_BOOL(4);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -189,7 +193,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 									temporary,
 									two_phase,
 									InvalidXLogRecPtr,
-									true);
+									true,
+									onlylocal_data);
 
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -231,7 +236,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -429,6 +434,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 
 		values[i++] = BoolGetDatum(slot_contents.data.two_phase);
 
+		values[i++] = BoolGetDatum(slot_contents.data.onlylocal_data);
+
 		Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -794,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 										temporary,
 										false,
 										src_restart_lsn,
+										false,
 										false);
 	}
 	else
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ceaff097b9..cfdefb1f22 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -374,7 +374,7 @@ WalReceiverMain(void)
 					 "pg_walreceiver_%lld",
 					 (long long int) walrcv_get_backend_pid(wrconn));
 
-			walrcv_create_slot(wrconn, slotname, true, false, 0, NULL);
+			walrcv_create_slot(wrconn, slotname, true, false, 0, NULL, false);
 
 			SpinLockAcquire(&walrcv->mutex);
 			strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 5a718b1fe9..b826326b98 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -963,12 +963,14 @@ static void
 parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 						   bool *reserve_wal,
 						   CRSSnapshotAction *snapshot_action,
-						   bool *two_phase)
+						   bool *two_phase,
+						   bool *onlylocal_data)
 {
 	ListCell   *lc;
 	bool		snapshot_action_given = false;
 	bool		reserve_wal_given = false;
 	bool		two_phase_given = false;
+	bool		onlylocal_data_given = false;
 
 	/* Parse options */
 	foreach(lc, cmd->options)
@@ -1019,6 +1021,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
 			two_phase_given = true;
 			*two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "only_local") == 0)
+		{
+			if (onlylocal_data_given || cmd->kind != REPLICATION_KIND_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			onlylocal_data_given = true;
+			*onlylocal_data = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -1035,6 +1046,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	char	   *slot_name;
 	bool		reserve_wal = false;
 	bool		two_phase = false;
+	bool		onlylocal_data = false;
 	CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
 	DestReceiver *dest;
 	TupOutputState *tstate;
@@ -1044,13 +1056,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
+	parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
+							   &onlylocal_data);
 
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false);
+							  false, false);
 	}
 	else
 	{
@@ -1065,7 +1078,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase);
+							  two_phase, onlylocal_data);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6957567264..02a1f96d1e 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1834,7 +1834,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit");
+		COMPLETE_WITH("binary", "only_local", "slot_name", "streaming", "synchronous_commit");
 	/* ALTER SUBSCRIPTION <name> SET PUBLICATION */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "PUBLICATION"))
 	{
@@ -3104,7 +3104,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
 					  "enabled", "slot_name", "streaming",
-					  "synchronous_commit", "two_phase");
+					  "synchronous_commit", "two_phase", "only_local");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index bf88858171..74a5f2ac0b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10776,9 +10776,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,only_local}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 18c291289f..6e3119247c 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -65,6 +65,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		subonlylocaldata; /* skip copying of remote origin data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
@@ -102,6 +104,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		onlylocaldata;	/* Skip copying of remote orging data */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 1097cc9799..82014fe252 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,6 +99,8 @@ typedef struct LogicalDecodingContext
 	 */
 	bool		twophase_opt_given;
 
+	bool 		onlylocal_data;
+
 	/*
 	 * State for writing output.
 	 */
@@ -138,6 +140,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_remotedata_cb_wrapper(LogicalDecodingContext *ctx,
+										 RepOriginId origin_id);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index a16bebf76c..52b5de3eb8 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,12 @@ typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter remote origin changes.
+ */
+typedef bool (*LogicalDecodeFilterRemoteOriginCB) (struct LogicalDecodingContext *ctx,
+												   RepOriginId origin_id);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -246,6 +252,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeSequenceCB sequence_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterRemoteOriginCB filter_remotedata_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd610a..e8fac6b3f8 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,6 +29,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		onlylocal_data;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 24b30210c3..833d380b0f 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -94,6 +94,8 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	bool		two_phase;
 
+	bool		onlylocal_data;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
@@ -195,7 +197,8 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-								  ReplicationSlotPersistency p, bool two_phase);
+								  ReplicationSlotPersistency p, bool two_phase,
+								  bool onlylocal_data);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 92f73a55b8..e62dca9b45 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		onlylocal_data;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
@@ -351,7 +352,8 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 										bool temporary,
 										bool two_phase,
 										CRSSnapshotAction snapshot_action,
-										XLogRecPtr *lsn);
+										XLogRecPtr *lsn,
+										bool onlylocal_data);
 
 /*
  * walrcv_get_backend_pid_fn
@@ -423,8 +425,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
 #define walrcv_send(conn, buffer, nbytes) \
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
-#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
-	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data) \
+	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn, onlylocal_data)
 #define walrcv_get_backend_pid(conn) \
 	WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index ac468568a1..d4a2ec85e7 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1456,8 +1456,9 @@ pg_replication_slots| SELECT l.slot_name,
     l.confirmed_flush_lsn,
     l.wal_status,
     l.safe_wal_size,
-    l.two_phase
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+    l.two_phase,
+    l.only_local
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, only_local)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 80aae83562..dbf75d8b17 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -70,7 +70,11 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- ok - with only_local = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index bd0f4af1e4..3b65c42142 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -54,7 +54,11 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- ok - with only_local = true
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = true);
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/029_circular.pl b/src/test/subscription/t/029_circular.pl
new file mode 100644
index 0000000000..553635ae5d
--- /dev/null
+++ b/src/test/subscription/t/029_circular.pl
@@ -0,0 +1,108 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test circular logical replication.
+#
+# Includes tests for circulation replication using only_local option.
+#
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###################################################
+# Setup a circulation replication of pub/sub nodes.
+# node_A -> node_B -> node_A
+###################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf('postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_B->start;
+
+# Create tables on node_A
+$node_A->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create the same tables on node_B
+$node_B->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B = 'tap_sub_B';
+$node_B->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_B
+	CONNECTION '$node_A_connstr application_name=$appname_B'
+	PUBLICATION tap_pub_A
+	WITH (only_local = on)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql('postgres',	"
+	CREATE SUBSCRIPTION tap_sub_A
+	CONNECTION '$node_B_connstr application_name=$appname_A'
+	PUBLICATION tap_pub_B
+	WITH (only_local = on)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1,1, "Circular replication setup is complete");
+
+my $result;
+
+##########################################################################
+# check that circular replication setup does not cause infinite recursive
+# insertion.
+##########################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+
+$node_A->wait_for_catchup($appname_B);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
+is($result, qq(11
+12), 'Inserted successfully without leading to infinite recursion in circular replication setup');
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d9b83f744f..9608cb7bbc 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1370,6 +1370,7 @@ LogicalDecodeCommitCB
 LogicalDecodeCommitPreparedCB
 LogicalDecodeFilterByOriginCB
 LogicalDecodeFilterPrepareCB
+LogicalDecodeFilterRemoteOriginCB
 LogicalDecodeMessageCB
 LogicalDecodePrepareCB
 LogicalDecodeRollbackPreparedCB
-- 
2.32.0

Reply via email to