On Tue, Apr 19, 2022 at 8:29 AM Peter Smith <smithpb2...@gmail.com> wrote:
>
> I checked the latest v9-0001 patch. Below are my review comments.
>
> Other than these few trivial comments this 0001 patch looks good to me.
>
> ~~~
>
> 1. src/backend/replication/pgoutput/pgoutput.c - whitespace
>
> @@ -1696,6 +1714,10 @@ static bool
>  pgoutput_origin_filter(LogicalDecodingContext *ctx,
>      RepOriginId origin_id)
>  {
> + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
> +
> + if (data->local_only && origin_id != InvalidRepOriginId)
> + return true;
>   return false;
>  }
>
> Suggest to add a blank line after the return true;

Modified

> ~~~
>
> 2. src/bin/psql/tab-complete.c - not alphabetical
>
> @@ -1874,7 +1874,7 @@ psql_completion(const char *text, int start, int end)
>   COMPLETE_WITH("(", "PUBLICATION");
>   /* ALTER SUBSCRIPTION <name> SET ( */
>   else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) &&
> TailMatches("SET", "("))
> - COMPLETE_WITH("binary", "slot_name", "streaming",
> "synchronous_commit", "disable_on_error");
> + COMPLETE_WITH("binary", "slot_name", "streaming", "local_only",
> "synchronous_commit", "disable_on_error");
>
> 2a. AFAIK the code intended that these options be listed in
> alphabetical order (I think the recent addition of disable_on_error is
> also wrong here). So "local_only" should be moved.

Modified

> @@ -3156,7 +3156,7 @@ psql_completion(const char *text, int start, int end)
>   /* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
>   else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
>   COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
> -   "enabled", "slot_name", "streaming",
> +   "enabled", "slot_name", "streaming", "local_only",
>     "synchronous_commit", "two_phase", "disable_on_error");
>
> 2b. ditto

Modified

> ~~~
>
> 3. src/test/subscription/t/032_localonly.pl - wrong message
>
> +$node_C->wait_for_catchup($appname_B2);
> +$node_B->wait_for_catchup($appname_A);
> +
> +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
> +is( $result, qq(11
> +12
> +13),
> + 'Inserted successfully without leading to infinite recursion in
> circular replication setup'
> +);
> +
> +# check that the data published from node_C to node_B is not sent to node_A
> +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
> +is( $result, qq(11
> +12),
> + 'Inserted successfully without leading to infinite recursion in
> circular replication setup'
> +);
> +
>
> The new test looked good, but the cut/paste text message ('Inserted
> successfully without leading to infinite recursion in circular
> replication setup') maybe needs changing because there is nothing
> really "circular" about this test case.

Modified

Attached v10 patch has the changes for the same.

Regards,
Vignesh
From 20a6e0f29a2abc63c141bdf72930011358d7b2db Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Fri, 8 Apr 2022 11:10:05 +0530
Subject: [PATCH v10 1/2] Skip replication of non local data.

This patch adds a new SUBSCRIPTION boolean option
"local_only". The default is false. When a SUBSCRIPTION is
created with this option enabled, the publisher will only publish data
that originated at the publisher node.
Usage:
CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999'
PUBLICATION pub1 with (local_only = true);
---
 doc/src/sgml/ref/alter_subscription.sgml      |   5 +-
 doc/src/sgml/ref/create_subscription.sgml     |  12 ++
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   4 +-
 src/backend/commands/subscriptioncmds.c       |  26 ++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   5 +
 src/backend/replication/logical/worker.c      |   2 +
 src/backend/replication/pgoutput/pgoutput.c   |  23 +++
 src/bin/pg_dump/pg_dump.c                     |  17 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   8 +-
 src/bin/psql/tab-complete.c                   |   4 +-
 src/include/catalog/pg_subscription.h         |   3 +
 src/include/replication/logicalproto.h        |  10 +-
 src/include/replication/pgoutput.h            |   1 +
 src/include/replication/walreceiver.h         |   1 +
 src/test/regress/expected/subscription.out    | 142 ++++++++-------
 src/test/regress/sql/subscription.sql         |  10 ++
 src/test/subscription/t/032_localonly.pl      | 166 ++++++++++++++++++
 19 files changed, 369 insertions(+), 72 deletions(-)
 create mode 100644 src/test/subscription/t/032_localonly.pl

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 353ea5def2..c5ebcf5500 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -207,8 +207,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
       <literal>synchronous_commit</literal>,
-      <literal>binary</literal>, <literal>streaming</literal>, and
-      <literal>disable_on_error</literal>.
+      <literal>binary</literal>, <literal>streaming</literal>,
+      <literal>disable_on_error</literal>, and
+      <literal>local_only</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 203bb41844..c09f7b0600 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -152,6 +152,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>local_only</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the subscription will request the publisher to send
+          locally originated changes at the publisher node, or send any
+          publisher node changes regardless of their origin. The default is
+          <literal>false</literal>.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>slot_name</literal> (<type>string</type>)</term>
         <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index add51caadf..e3d13ffb1c 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->stream = subform->substream;
 	sub->twophasestate = subform->subtwophasestate;
 	sub->disableonerr = subform->subdisableonerr;
+	sub->local_only = subform->sublocalonly;
 
 	/* Get conninfo */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 0fc614e32c..4cc4a60005 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1290,8 +1290,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 -- All columns of pg_subscription except subconninfo are publicly readable.
 REVOKE ALL ON pg_subscription FROM public;
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
-              subbinary, substream, subtwophasestate, subdisableonerr, subslotname,
-              subsynccommit, subpublications)
+              subbinary, substream, subtwophasestate, subdisableonerr,
+              sublocalonly, subslotname, subsynccommit, subpublications)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b94236f74d..21fd805a81 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -64,6 +64,7 @@
 #define SUBOPT_TWOPHASE_COMMIT		0x00000200
 #define SUBOPT_DISABLE_ON_ERR		0x00000400
 #define SUBOPT_LSN					0x00000800
+#define SUBOPT_LOCAL_ONLY			0x00001000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -86,6 +87,7 @@ typedef struct SubOpts
 	bool		streaming;
 	bool		twophase;
 	bool		disableonerr;
+	bool		local_only;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->twophase = false;
 	if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
 		opts->disableonerr = false;
+	if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY))
+		opts->local_only = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -235,6 +239,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_LOCAL_ONLY) &&
+				 strcmp(defel->defname, "local_only") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_LOCAL_ONLY))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_LOCAL_ONLY;
+			opts->local_only = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "two_phase") == 0)
 		{
 			/*
@@ -531,7 +544,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
 					  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
 					  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
-					  SUBOPT_DISABLE_ON_ERR);
+					  SUBOPT_DISABLE_ON_ERR | SUBOPT_LOCAL_ONLY);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -602,6 +615,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
 	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
 	values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+	values[Anum_pg_subscription_sublocalonly - 1] = BoolGetDatum(opts.local_only);
 	values[Anum_pg_subscription_subtwophasestate - 1] =
 		CharGetDatum(opts.twophase ?
 					 LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1015,7 +1029,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);
+								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_LOCAL_ONLY);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1072,6 +1087,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						= true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_LOCAL_ONLY))
+				{
+					values[Anum_pg_subscription_sublocalonly - 1] =
+						BoolGetDatum(opts.local_only);
+					replaces[Anum_pg_subscription_sublocalonly - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 0d89db4e6a..0072f5f101 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -453,6 +453,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 150000)
 			appendStringInfoString(&cmd, ", two_phase 'on'");
 
+		/* FIXME: 150000 should be changed to 160000 later for PG16. */
+		if (options->proto.logical.local_only &&
+			PQserverVersion(conn->streamConn) >= 150000)
+			appendStringInfoString(&cmd, ", local_only 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
 		if (!pubnames_str)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 4171371296..9ccaa255c0 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3055,6 +3055,7 @@ maybe_reread_subscription(void)
 		newsub->binary != MySubscription->binary ||
 		newsub->stream != MySubscription->stream ||
 		newsub->owner != MySubscription->owner ||
+		newsub->local_only != MySubscription->local_only ||
 		!equal(newsub->publications, MySubscription->publications))
 	{
 		ereport(LOG,
@@ -3735,6 +3736,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.proto.logical.binary = MySubscription->binary;
 	options.proto.logical.streaming = MySubscription->stream;
 	options.proto.logical.twophase = false;
+	options.proto.logical.local_only = MySubscription->local_only;
 
 	if (!am_tablesync_worker())
 	{
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565..a081395823 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -286,11 +286,13 @@ parse_output_parameters(List *options, PGOutputData *data)
 	bool		messages_option_given = false;
 	bool		streaming_given = false;
 	bool		two_phase_option_given = false;
+	bool		local_only_option_given = false;
 
 	data->binary = false;
 	data->streaming = false;
 	data->messages = false;
 	data->two_phase = false;
+	data->local_only = false;
 
 	foreach(lc, options)
 	{
@@ -379,6 +381,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
 			data->two_phase = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "local_only") == 0)
+		{
+			if (local_only_option_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			local_only_option_given = true;
+
+			data->local_only = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
 	}
@@ -477,6 +489,12 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 		else
 			ctx->twophase_opt_given = true;
 
+		if (data->local_only && data->protocol_version < LOGICALREP_PROTO_LOCALONLY_VERSION_NUM)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("requested proto_version=%d does not support local_only, need %d or higher",
+							data->protocol_version, LOGICALREP_PROTO_LOCALONLY_VERSION_NUM)));
+
 		/* Init publication state. */
 		data->publications = NIL;
 		publications_valid = false;
@@ -1696,6 +1714,11 @@ static bool
 pgoutput_origin_filter(LogicalDecodingContext *ctx,
 					   RepOriginId origin_id)
 {
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
+	if (data->local_only && origin_id != InvalidRepOriginId)
+		return true;
+
 	return false;
 }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 786d592e2b..904cb4b0d5 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4410,6 +4410,7 @@ getSubscriptions(Archive *fout)
 	int			i_subsynccommit;
 	int			i_subpublications;
 	int			i_subbinary;
+	int			i_sublocalonly;
 	int			i,
 				ntups;
 
@@ -4454,13 +4455,19 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 150000)
 		appendPQExpBufferStr(query,
 							 " s.subtwophasestate,\n"
-							 " s.subdisableonerr\n");
+							 " s.subdisableonerr,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%c' AS subtwophasestate,\n"
-						  " false AS subdisableonerr\n",
+						  " false AS subdisableonerr,\n",
 						  LOGICALREP_TWOPHASE_STATE_DISABLED);
 
+	/* FIXME: 150000 should be changed to 160000 later for PG16. */
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query, " s.sublocalonly\n");
+	else
+		appendPQExpBufferStr(query, " false AS sublocalonly\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
@@ -4486,6 +4493,7 @@ getSubscriptions(Archive *fout)
 	i_substream = PQfnumber(res, "substream");
 	i_subtwophasestate = PQfnumber(res, "subtwophasestate");
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
+	i_sublocalonly = PQfnumber(res, "sublocalonly");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4515,6 +4523,8 @@ getSubscriptions(Archive *fout)
 			pg_strdup(PQgetvalue(res, i, i_subtwophasestate));
 		subinfo[i].subdisableonerr =
 			pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
+		subinfo[i].sublocalonly =
+			pg_strdup(PQgetvalue(res, i, i_sublocalonly));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4588,6 +4598,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (strcmp(subinfo->subdisableonerr, "t") == 0)
 		appendPQExpBufferStr(query, ", disable_on_error = true");
 
+	if (strcmp(subinfo->sublocalonly, "t") == 0)
+		appendPQExpBufferStr(query, ", local_only = true");
+
 	if (strcmp(subinfo->subsynccommit, "off") != 0)
 		appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1d21c2906f..545b76ba5e 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subsynccommit;
 	char	   *subpublications;
+	char	   *sublocalonly;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 4369f2235b..782ab26bac 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6354,7 +6354,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
-	false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6396,6 +6396,12 @@ describeSubscriptions(const char *pattern, bool verbose)
 							  gettext_noop("Two phase commit"),
 							  gettext_noop("Disable on error"));
 
+		/* FIXME: 150000 should be changed to 160000 later for PG16 */
+		if (pset.sversion >= 150000)
+			appendPQExpBuffer(&buf,
+							  ", sublocalonly AS \"%s\"\n",
+							  gettext_noop("Local only"));
+
 		appendPQExpBuffer(&buf,
 						  ",  subsynccommit AS \"%s\"\n"
 						  ",  subconninfo AS \"%s\"\n",
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 588c0841fe..1c0f43f44e 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1874,7 +1874,7 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
-		COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
+		COMPLETE_WITH("binary", "local_only", "slot_name", "streaming", "synchronous_commit", "disable_on_error");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
@@ -3156,7 +3156,7 @@ psql_completion(const char *text, int start, int end)
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "enabled", "slot_name", "streaming",
+					  "enabled", "local_only", "slot_name", "streaming",
 					  "synchronous_commit", "two_phase", "disable_on_error");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index d1260f590c..aaf272c11c 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		substream;		/* Stream in-progress transactions. */
 
+	bool		sublocalonly;		/* skip copying of remote origin data */
+
 	char		subtwophasestate;	/* Stream two-phase transactions */
 
 	bool		subdisableonerr;	/* True if a worker error should cause the
@@ -110,6 +112,7 @@ typedef struct Subscription
 	bool		binary;			/* Indicates if the subscription wants data in
 								 * binary format */
 	bool		stream;			/* Allow streaming in-progress transactions. */
+	bool		local_only;		/* Skip copying of remote origin data */
 	char		twophasestate;	/* Allow streaming two-phase transactions */
 	bool		disableonerr;	/* Indicates if the subscription should be
 								 * automatically disabled if a worker error
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index a771ab8ff3..ed6c5de23c 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -32,12 +32,20 @@
  *
  * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
  * support for two-phase commit decoding (at prepare time). Introduced in PG15.
+ *
+ * LOGICALREP_PROTO_LOCALONLY_VERSION_NUM is the minimum protocol version with
+ * support for sending only locally originated data from the publisher.
+ * Introduced in PG16.
+ *
+ * FIXME: LOGICALREP_PROTO_LOCALONLY_VERSION_NUM needs to be bumped to 4 in
+ * PG16.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
 #define LOGICALREP_PROTO_VERSION_NUM 1
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
-#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
+#define LOGICALREP_PROTO_LOCALONLY_VERSION_NUM 3
+#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_LOCALONLY_VERSION_NUM
 
 /*
  * Logical message types
diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h
index eafedd610a..eb7859c445 100644
--- a/src/include/replication/pgoutput.h
+++ b/src/include/replication/pgoutput.h
@@ -29,6 +29,7 @@ typedef struct PGOutputData
 	bool		streaming;
 	bool		messages;
 	bool		two_phase;
+	bool		local_only;
 } PGOutputData;
 
 #endif							/* PGOUTPUT_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 81184aa92f..10b860f74e 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -183,6 +183,7 @@ typedef struct
 			bool		streaming;	/* Streaming of large transactions */
 			bool		twophase;	/* Streaming of two-phase transactions at
 									 * prepare time */
+			bool		local_only; /* publish only locally originated data */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 7fcfad1591..8bf7c810a5 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ERROR:  cannot enable subscription that does not have a slot name
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 ERROR:  ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions
+-- fail - local_only must be boolean
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, local_only = foo);
+ERROR:  local_only requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, local_only = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+ regress_testsub4
+                                                                                           List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | t          | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false);
+\dRs+ regress_testsub4
+                                                                                           List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
+(1 row)
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -96,10 +118,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/12345
+                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist2 | 0/12345
 (1 row)
 
 -- ok - with lsn = NONE
@@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist2 | 0/0
+                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |           Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 BEGIN;
@@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                           List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |           Conninfo           | Skip LSN 
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+----------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | local              | dbname=regress_doesnotexist2 | 0/0
+                                                                                                 List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |           Conninfo           | Skip LSN 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+----------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | d                | f                | f          | local              | dbname=regress_doesnotexist2 | 0/0
 (1 row)
 
 -- rename back to keep the rest simple
@@ -179,19 +201,19 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -202,19 +224,19 @@ ERROR:  streaming requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication already exists
@@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                            List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 -- fail - publication used more then once
@@ -247,10 +269,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -284,10 +306,10 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 --fail - alter of two_phase option not supported.
@@ -296,10 +318,10 @@ ERROR:  unrecognized subscription parameter: "two_phase"
 -- but can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub;
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | p                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -323,18 +345,18 @@ ERROR:  disable_on_error requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | f                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                    List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit |          Conninfo           | Skip LSN 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+----------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | off                | dbname=regress_doesnotexist | 0/0
+                                                                                           List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Local only | Synchronous commit |          Conninfo           | Skip LSN 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+----------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | d                | t                | f          | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 74c38ead5d..327a1e2500 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU
 ALTER SUBSCRIPTION regress_testsub3 ENABLE;
 ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION;
 
+-- fail - local_only must be boolean
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, local_only = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, local_only = true);
+\dRs+ regress_testsub4
+ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false);
+\dRs+ regress_testsub4
+
 DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
 
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
diff --git a/src/test/subscription/t/032_localonly.pl b/src/test/subscription/t/032_localonly.pl
new file mode 100644
index 0000000000..a8b2df27b6
--- /dev/null
+++ b/src/test/subscription/t/032_localonly.pl
@@ -0,0 +1,166 @@
+
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication using local_only option.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+###############################################################################
+# Setup a bidirectional logical replication between Node_A & Node_B
+###############################################################################
+
+# Initialize nodes
+# node_A
+my $node_A = PostgreSQL::Test::Cluster->new('node_A');
+$node_A->init(allows_streaming => 'logical');
+$node_A->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_A->start;
+# node_B
+my $node_B = PostgreSQL::Test::Cluster->new('node_B');
+$node_B->init(allows_streaming => 'logical');
+$node_B->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_B->start;
+
+# Create tables on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Create the same tables on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
+my $appname_B1 = 'tap_sub_B1';
+$node_B->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_B1
+	CONNECTION '$node_A_connstr application_name=$appname_B1'
+	PUBLICATION tap_pub_A
+	WITH (local_only = on)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
+my $appname_A = 'tap_sub_A';
+$node_A->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_A
+	CONNECTION '$node_B_connstr application_name=$appname_A'
+	PUBLICATION tap_pub_B
+	WITH (local_only = on, copy_data = off)");
+
+# Wait for subscribers to finish initialization
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+is(1, 1, "Circular replication setup is complete");
+
+my $result;
+
+###############################################################################
+# check that bidirectional logical replication setup does not cause infinite
+# recursive insertion.
+###############################################################################
+
+# insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+
+$node_A->wait_for_catchup($appname_B1);
+$node_B->wait_for_catchup($appname_A);
+
+# check that transaction was committed on subscriber(s)
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
+is( $result, qq(11
+12),
+	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
+is( $result, qq(11
+12),
+	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+);
+
+###############################################################################
+# check that remote data that is originated from node_C to node_B is not
+# published to node_A
+###############################################################################
+# Initialize node node_C
+my $node_C = PostgreSQL::Test::Cluster->new('node_C');
+$node_C->init(allows_streaming => 'logical');
+$node_C->append_conf(
+	'postgresql.conf', qq(
+max_prepared_transactions = 10
+logical_decoding_work_mem = 64kB
+));
+$node_C->start;
+
+$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication
+# node_C (pub) -> node_B (sub)
+my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
+$node_C->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_C FOR TABLE tab_full");
+
+my $appname_B2 = 'tap_sub_B2';
+$node_B->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION tap_sub_B2
+	CONNECTION '$node_C_connstr application_name=$appname_B2'
+	PUBLICATION tap_pub_C
+	WITH (local_only = on)");
+
+$node_C->wait_for_catchup($appname_B2);
+
+$node_C->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# insert a record
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);");
+
+$node_C->wait_for_catchup($appname_B2);
+$node_B->wait_for_catchup($appname_A);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
+is( $result, qq(11
+12
+13),
+	'Node_C data replicated to Node_B'
+);
+
+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
+is( $result, qq(11
+12),
+	'Remote data originated from other node is not replicated when local_only option is ON'
+);
+
+# shutdown
+$node_B->stop('fast');
+$node_A->stop('fast');
+$node_C->stop('fast');
+
+done_testing();
-- 
2.32.0

From ad759c77e23ee49090260a36c7a9e633d2f844a3 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Fri, 22 Apr 2022 21:37:37 +0530
Subject: [PATCH v10 2/2] Support force option for copy_data, check and throw
 an error if publisher tables were also subscribing data in the publisher from
 other publishers.

This patch does couple of things:
1) Added force option for copy_data.
2) Check and throw an error if the publication tables were also subscribing
data in the publisher from other publishers.

Let's consider an existing Multi master logical replication setup between
Node1 and Node2 that is created using the following steps:
a) Node1 - PUBLICATION pub1 for the employee table
b) Node2 - SUBSCRIPTION from pub1 with local_only=true
c) Node2 - PUBLICATION pub2 for the employee table
d) Node1 - SUBSCRIPTION from pub2 with local_only=true

Now when user is trying to add another node Node3 to the
above Multi master logical replication setup:
a) user will have to create one subscription subscribing from Node1 to
Node3
b) user wil have to create another subscription subscribing from
Node2 to Node3 using local_only option and copy_data as true.

Here when user has specified local_only 'on' which indicates that the
publisher should only replicate the changes that are generated locally, but in
this case since the publisher node is also subscribing data from other nodes,
the publisher node can have remotely originated data, so throw an error in this
case to prevent remotely generated data being replicated to the subscriber. If
user still intends to continue with the operation user can specify copy_data
as 'force' and proceed.

Handling of initial data copying in this case is detailed in
the documentation section of the patch.
---
 doc/src/sgml/logical-replication.sgml      | 264 ++++++++++++++++
 doc/src/sgml/ref/alter_subscription.sgml   |  14 +-
 doc/src/sgml/ref/create_subscription.sgml  |  30 +-
 src/backend/commands/subscriptioncmds.c    | 140 +++++++--
 src/test/regress/expected/subscription.out |  18 +-
 src/test/regress/sql/subscription.sql      |  12 +
 src/test/subscription/t/032_localonly.pl   | 338 ++++++++++++++++++---
 src/tools/pgindent/typedefs.list           |   1 +
 8 files changed, 738 insertions(+), 79 deletions(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 145ea71d61..f9ba543423 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1267,4 +1267,268 @@ CREATE SUBSCRIPTION mysub CONNECTION 'dbname=foo host=bar user=repuser' PUBLICAT
    incremental changes to those tables.
   </para>
  </sect1>
+
+ <sect1 id="bidirectional-logical-replication">
+  <title>Bidirectional logical replication</title>
+
+  <sect2 id="setting-bidirectional-replication-two-nodes">
+   <title>Setting bidirectional replication between two nodes</title>
+   <para>
+     Bidirectional replication is useful in creating multi master database
+     which helps in performing read/write operations from any of the nodes.
+     Setting up bidirectional logical replication between two nodes requires
+     creation of the publication in all the nodes, creating subscription in
+     each of the nodes that subcribes to data from all the nodes. The steps
+     to create two node bidirectional replication is listed below:
+   </para>
+
+   <para>
+     Create the publication in node1:
+<programlisting>
+node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+     Create the subscription in node2 to subscribe the changes from node1:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node1_node2
+node2=# CONNECTION 'dbname=foo host=node1 user=repuser'
+node2=# PUBLICATION pub_node1
+node2=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+     Create publication in node2:
+<programlisting>
+node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+     Create subscription in node1 to subscribe the changes from node2:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node2_node1
+node1=# CONNECTION 'dbname=foo host=node2 user=repuser'
+node1=# PUBLICATION pub_node2
+node1=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+   Now the bidirectional logical replication setup is complete between node1
+   and node2. Any incremental changes from node1 will be replicated to node2
+   and the incremental changes from node2 will be replicated to node1.
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-node">
+   <title>Adding new node when there is no data in any of the nodes</title>
+   <para>
+     Adding a new node node3 to the existing node1 and node2 requires setting
+     up subscription in node1 and node2 to replicate the data from node3 and
+     setting up subscription in node3 to replicate data from node1 and node2.
+     The steps for the same is listed below:
+   </para>
+
+   <para>
+    Create publication in node3:
+<programlisting>
+node3=# CREATE PUBLICATION pub_node3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node1 to subscribe the changes from node3:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node3
+node1=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node1=# PUBLICATION pub_node3
+node1=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node2 to subscribe the changes from node3:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node3
+node2=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node2=# PUBLICATION pub_node3
+node2=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+        Create subscription in node3 to subscribe the changes from node1:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node1
+node3=# CONNECTION 'dbname=foo host=node1 user=repuser'
+node3=# PUBLICATION pub_node1
+node3=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+        Create subscription in node3 to subscribe the changes from node2:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node2
+node3=# CONNECTION 'dbname=foo host=node2 user=repuser'
+node3=# PUBLICATION pub_node2
+node3=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-node-data-in-existing-node">
+   <title>Adding new node when data is present in the existing nodes</title>
+    <para>
+     Adding a new node node3 to the existing node1 and node2 when data is
+     present in existing nodes node1 and node2 needs similar steps, only change
+     required here is that node3 should create subscription with copy_data as
+     force to one of the existing nodes to receive the existing data during
+     initial data synchronization. The steps for the same is listed below:
+   </para>
+
+   <para>
+    Create publication in node3:
+<programlisting>
+node3=# CREATE PUBLICATION pub_node3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node1 to subscribe the changes from node3:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node3
+node1=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node1=# PUBLICATION pub_node3
+node1=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node2 to subscribe the changes from node3:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node3
+node2=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node2=# PUBLICATION pub_node3
+node2=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node3 to subscribe the changes from node1, here
+        copy_data is specified as force so that the existing table data is
+        copied during initial sync:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node1
+node3=# CONNECTION 'dbname=foo host=node1 user=repuser'
+node3=# PUBLICATION pub_node1
+node3=# WITH (copy_data = force, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node3 to subscribe the changes from node2:
+<programlisting>
+node3=# CREATE SUBSCRIPTION sub_node3_node2
+node3=# CONNECTION 'dbname=foo host=node2 user=repuser'
+node3=# PUBLICATION pub_node2
+node3=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+  </sect2>
+
+  <sect2 id="add-node-data-present-in-new-node">
+   <title>Adding new node when data is present in the new node</title>
+   <para>
+     Adding a new node node3 to the existing node1 and node2 when data is
+     present in the new node node3 needs similar steps, few changes are
+     required here to get the existing data from node3 to node1 and node2 and
+     later cleaning up of data in node3 before synchronization of all the data
+     from the existing nodes. The steps for the same is listed below:
+   </para>
+
+   <para>
+    Create publication in node3:
+<programlisting>
+node3=# CREATE PUBLICATION pub_node3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node1 to subscribe the changes from node3, here
+    copy_data is specified as force so that the existing table data is
+    copied during initial sync:
+<programlisting>
+node1=# CREATE SUBSCRIPTION sub_node1_node3
+node1=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node1=# PUBLICATION pub_node3
+node1=# WITH (copy_data = force, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node2 to subscribe the changes from node3, here
+    copy_data is specified as force so that the existing table data is
+    copied during initial sync:
+<programlisting>
+node2=# CREATE SUBSCRIPTION sub_node2_node3
+node2=# CONNECTION 'dbname=foo host=node3 user=repuser'
+node2=# PUBLICATION pub_node3
+node2=# WITH (copy_data = force, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+
+   <para>
+    Adjust the publication publish settings so that truncate is not published
+    to the subscribers and truncate the table data in node3:
+<programlisting>
+node3=# ALTER PUBLICATION pub_node3 SET (publish='insert,update,delete');
+ALTER PUBLICATION
+node3=# TRUNCATE t1;
+TRUNCATE TABLE
+node3=# ALTER PUBLICATION pub_node3 SET (publish='insert,update,delete,truncate');
+ALTER PUBLICATION
+</programlisting>
+   </para>
+
+   <para>
+    Create subscription in node3 to subscribe the changes from node1 and
+    node2, here copy_data is specified as force when creating subscription
+    to node1 so that the existing table data is copied during initial sync:
+<programlisting>
+node3=# CREATE SUBSCRIPTION
+node3=# sub_node3_node1 CONNECTION 'dbname=foo host=node1 user=repuser'
+node3=# PUBLICATION pub_node1
+node3=# WITH (copy_data = force, local_only = on);
+CREATE SUBSCRIPTION
+node3=# CREATE SUBSCRIPTION
+node3=# sub_node3_node2 CONNECTION 'dbname=foo host=node2 user=repuser'
+node3=# PUBLICATION pub_node2
+node3=# WITH (copy_data = off, local_only = on);
+CREATE SUBSCRIPTION
+</programlisting>
+   </para>
+  </sect2>
+ </sect1>
+
 </chapter>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index c5ebcf5500..1e33b3fcdd 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -161,12 +161,20 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
       <variablelist>
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
+         </para>
+         <para>
+          There is some interaction between the "local_only" option and
+          "copy_data" option. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for interaction
+          details and usage of force for copy_data option.
          </para>
          <para>
           Previously subscribed tables are not copied, even if a table's row
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index c09f7b0600..d6caed500f 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -161,6 +161,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           publisher node changes regardless of their origin. The default is
           <literal>false</literal>.
          </para>
+         <para>
+          There is some interaction between the "local_only" option and
+          "copy_data" option. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -213,18 +218,27 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
        </varlistentry>
 
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
          </para>
          <para>
           If the publications contain <literal>WHERE</literal> clauses, it
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+
+         <para>
+          There is some interaction between the "local_only" option and
+          "copy_data" option. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for interaction
+          details and usage of force value for copy_data option.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -374,6 +388,16 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    can have non-existent publications.
   </para>
 
+  <para>
+   If subscription is created with local_only as 'on' and copy_data as 'on', it
+   will check if the publisher tables are being subscribed to any other
+   publisher and throw an error to prevent inconsistent data in the
+   subscription. User can continue with the copy operation without throwing any
+   error in this case by specifying copy_data as 'force'. Refer to the
+   <xref linkend="bidirectional-logical-replication"/> on how
+   copy_data and local_only can be used in bidirectional replication.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 21fd805a81..1445065365 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,6 +69,18 @@
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
 
+#define IS_COPY_DATA_ON_OR_FORCE(copy_data) (copy_data != COPY_DATA_OFF)
+
+/*
+ * Represents whether copy_data option is specified with off, on or force.
+ */
+typedef enum CopyData
+{
+	COPY_DATA_OFF,
+	COPY_DATA_ON,
+	COPY_DATA_FORCE
+} CopyData;
+
 /*
  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
  * SUBSCRIPTION command options and the parsed/default values of each of them.
@@ -81,7 +93,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
-	bool		copy_data;
+	CopyData	copy_data;
 	bool		refresh;
 	bool		binary;
 	bool		streaming;
@@ -91,11 +103,66 @@ typedef struct SubOpts
 	XLogRecPtr	lsn;
 } SubOpts;
 
-static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static List *fetch_table_list(WalReceiverConn *wrconn, List *publications,
+							  CopyData copydata, bool local_only);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
+/*
+ * Validate the value specified for copy_data option.
+ */
+static CopyData
+DefGetCopyData(DefElem *def)
+{
+	/*
+	 * If no parameter given, assume "true" is meant.
+	 */
+	if (def->arg == NULL)
+		return COPY_DATA_ON;
+
+	/*
+	 * Allow 0, 1, "true", "false", "on", "off" or "force".
+	 */
+	switch (nodeTag(def->arg))
+	{
+		case T_Integer:
+			switch (intVal(def->arg))
+			{
+				case 0:
+					return COPY_DATA_OFF;
+				case 1:
+					return COPY_DATA_ON;
+				default:
+					/* otherwise, error out below */
+					break;
+			}
+			break;
+		default:
+		{
+			char    *sval = defGetString(def);
+
+			/*
+			 * The set of strings accepted here should match up with
+			 * the grammar's opt_boolean_or_string production.
+			 */
+			if (pg_strcasecmp(sval, "true") == 0 ||
+				pg_strcasecmp(sval, "on") == 0)
+				return COPY_DATA_ON;
+			if (pg_strcasecmp(sval, "false") == 0 ||
+				pg_strcasecmp(sval, "off") == 0)
+				return COPY_DATA_OFF;
+			if (pg_strcasecmp(sval, "force") == 0)
+				return COPY_DATA_FORCE;
+		}
+		break;
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_SYNTAX_ERROR),
+			errmsg("%s requires a boolean or \"force\"", def->defname));
+	return COPY_DATA_OFF;                                           /* keep compiler quiet */
+}
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -128,7 +195,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
-		opts->copy_data = true;
+		opts->copy_data = COPY_DATA_ON;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -196,7 +263,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_COPY_DATA;
-			opts->copy_data = defGetBoolean(defel);
+			opts->copy_data = DefGetCopyData(defel);
 		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
@@ -333,17 +400,17 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 					 errmsg("%s and %s are mutually exclusive options",
 							"connect = false", "create_slot = true")));
 
-		if (opts->copy_data &&
+		if (IS_COPY_DATA_ON_OR_FORCE(opts->copy_data) &&
 			IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", "copy_data = true/force")));
 
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
-		opts->copy_data = false;
+		opts->copy_data = COPY_DATA_OFF;
 	}
 
 	/*
@@ -671,13 +738,14 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			 * Set sync state based on if we were asked to do data copy or
 			 * not.
 			 */
-			table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
+			table_state = IS_COPY_DATA_ON_OR_FORCE(opts.copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY;
 
 			/*
 			 * Get the table list from publisher and build local table status
 			 * info.
 			 */
-			tables = fetch_table_list(wrconn, publications);
+			tables = fetch_table_list(wrconn, publications, opts.copy_data,
+									  opts.local_only);
 			foreach(lc, tables)
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
@@ -720,7 +788,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 * PENDING, to allow ALTER SUBSCRIPTION ... REFRESH
 				 * PUBLICATION to work.
 				 */
-				if (opts.twophase && !opts.copy_data && tables != NIL)
+				if (opts.twophase && opts.copy_data == COPY_DATA_OFF &&
+					tables != NIL)
 					twophase_enabled = true;
 
 				walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
@@ -761,7 +830,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
+AlterSubscription_refresh(Subscription *sub, CopyData copy_data,
 						  List *validate_publications)
 {
 	char	   *err;
@@ -797,7 +866,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			check_publications(wrconn, validate_publications);
 
 		/* Get the table list from publisher. */
-		pubrel_names = fetch_table_list(wrconn, sub->publications);
+		pubrel_names = fetch_table_list(wrconn, sub->publications, copy_data,
+										sub->local_only);
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid);
@@ -851,7 +921,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 						 list_length(subrel_states), sizeof(Oid), oid_cmp))
 			{
 				AddSubscriptionRelState(sub->oid, relid,
-										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
+										IS_COPY_DATA_ON_OR_FORCE(copy_data) ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
 										InvalidXLogRecPtr);
 				ereport(DEBUG1,
 						(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
@@ -1157,7 +1227,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 * See ALTER_SUBSCRIPTION_REFRESH for details why this is
 					 * not allowed.
 					 */
-					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && IS_COPY_DATA_ON_OR_FORCE(opts.copy_data))
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
 								 errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"),
@@ -1236,7 +1306,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
 
 				parse_subscription_options(pstate, stmt->options,
-										   SUBOPT_COPY_DATA, &opts);
+										   SUBOPT_COPY_DATA,
+										   &opts);
 
 				/*
 				 * The subscription option "two_phase" requires that
@@ -1255,7 +1326,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				 *
 				 * For more details see comments atop worker.c.
 				 */
-				if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data)
+				if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+					IS_COPY_DATA_ON_OR_FORCE(opts.copy_data))
 					ereport(ERROR,
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"),
@@ -1778,22 +1850,27 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
  * publisher connection.
  */
 static List *
-fetch_table_list(WalReceiverConn *wrconn, List *publications)
+fetch_table_list(WalReceiverConn *wrconn, List *publications, CopyData copydata,
+				 bool local_only)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
-	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+	Oid			tableRow[3] = {TEXTOID, TEXTOID, CHAROID};
 	List	   *tablelist = NIL;
 
 	initStringInfo(&cmd);
-	appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n"
-						   "  FROM pg_catalog.pg_publication_tables t\n"
-						   " WHERE t.pubname IN (");
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename, PS.srrelid as replicated\n"
+						   "FROM pg_publication P,\n"
+						   "LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+						   "pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						   "WHERE C.oid = GPT.relid AND P.pubname in (");
 	get_publications_str(publications, &cmd, true);
 	appendStringInfoChar(&cmd, ')');
 
-	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	res = walrcv_exec(wrconn, cmd.data, 3, tableRow);
 	pfree(cmd.data);
 
 	if (res->status != WALRCV_OK_TUPLES)
@@ -1819,6 +1896,25 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
 		rv = makeRangeVar(nspname, relname, -1);
 		tablelist = lappend(tablelist, rv);
 
+		/*
+		 * XXX: During initial table sync we cannot differentiate between the
+		 * local and non-local data that is present in the HEAP. Identification
+		 * of local data can be done only from the WAL by using the origin id.
+		 * Throw an error so that the user can take care of the initial data
+		 * copying and then create subscription with copy_data off.
+		 *
+		 * It is quite possible that subscriber has not yet pulled data to
+		 * the tables, but in ideal cases the table data will be subscribed.
+		 * To keep the code simple it is not checked if the subscriber table
+		 * has pulled the data or not.
+		 */
+		if (copydata == COPY_DATA_ON && local_only && !slot_attisnull(slot, 3))
+			ereport(ERROR,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("CREATE/ALTER SUBSCRIPTION with local_only and copy_data as true is not allowed when the publisher might have replicated data, table:%s.%s might have replicated data in the publisher",
+						   nspname, relname),
+					errhint("Use CREATE/ALTER SUBSCRIPTION with copy_data = off or force"));
+
 		ExecClearTuple(slot);
 	}
 	ExecDropSingleTupleTableSlot(slot);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 8bf7c810a5..2967c1cf0a 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -47,7 +47,13 @@ ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 -- fail - invalid option combinations
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
-ERROR:  connect = false and copy_data = true are mutually exclusive options
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 ERROR:  connect = false and enabled = true are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
@@ -93,6 +99,16 @@ ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false);
 
 DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 327a1e2500..7b2915b7cd 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -40,6 +40,9 @@ SET SESSION AUTHORIZATION 'regress_subscription_user';
 
 -- fail - invalid option combinations
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true);
@@ -66,6 +69,15 @@ ALTER SUBSCRIPTION regress_testsub4 SET (local_only = false);
 DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
 
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false);
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off);
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0);
+
+DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
+
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
diff --git a/src/test/subscription/t/032_localonly.pl b/src/test/subscription/t/032_localonly.pl
index a8b2df27b6..1dc805a146 100644
--- a/src/test/subscription/t/032_localonly.pl
+++ b/src/test/subscription/t/032_localonly.pl
@@ -8,6 +8,115 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+my $result;
+my $stdout;
+my $stderr;
+
+my $subname_AB = 'tap_sub_A_B';
+my $subname_AC = 'tap_sub_A_C';
+my $subname_BA = 'tap_sub_B_A';
+my $subname_BC = 'tap_sub_B_C';
+my $subname_CA = 'tap_sub_C_A';
+my $subname_CB = 'tap_sub_C_B';
+
+# Detach node C and clean the table contents.
+sub detach_node_clean_table_data
+{
+	my ($node_A, $node_B, $node_C) = @_;
+	$node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C");
+	$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B");
+
+	$result =
+	  $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(0), 'check subscription was dropped on subscriber');
+
+	$result = $node_A->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_B->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_C->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(0), 'check replication slot was dropped on publisher');
+
+	$node_A->safe_psql('postgres', "TRUNCATE tab_full");
+	$node_B->safe_psql('postgres', "TRUNCATE tab_full");
+	$node_C->safe_psql('postgres', "TRUNCATE tab_full");
+}
+
+# Subroutine for verify the data is replicated successfully.
+sub verify_data
+{
+	my ($node_A, $node_B, $node_C, $expect) = @_;
+
+	$node_A->wait_for_catchup($subname_BA);
+	$node_A->wait_for_catchup($subname_CA);
+	$node_B->wait_for_catchup($subname_AB);
+	$node_B->wait_for_catchup($subname_CB);
+	$node_C->wait_for_catchup($subname_AC);
+	$node_C->wait_for_catchup($subname_BC);
+
+	# check that data is replicated to all the nodes
+	$result =
+	  $node_A->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+	is($result, qq($expect),
+	   'Inserted successfully without leading to infinite recursion in circular replication setup'
+	);
+
+	$result =
+	  $node_B->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+	is($result, qq($expect),
+	   'Inserted successfully without leading to infinite recursion in circular replication setup'
+	);
+
+	$result =
+	  $node_C->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+	is($result, qq($expect),
+	   'Inserted successfully without leading to infinite recursion in circular replication setup'
+	);
+}
+
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+
+# Subroutine to create subscription and wait till the initial sync is completed.
+# Subroutine expects subscriber node, publisher node, subscription name,
+# destination connection string, publication name and the subscription with
+# options to be passed as input parameters.
+sub create_subscription
+{
+	my ($node_subscriber, $node_publisher, $sub_name, $node_connstr,
+		$pub_name, $with_options)
+	  = @_;
+
+	# Application_name is always assigned the same value as the subscription
+	# name.
+	$node_subscriber->safe_psql(
+		'postgres', "
+                CREATE SUBSCRIPTION $sub_name
+                CONNECTION '$node_connstr application_name=$sub_name'
+                PUBLICATION $pub_name
+                WITH ($with_options)");
+	$node_publisher->wait_for_catchup($sub_name);
+
+	# also wait for initial table sync to finish
+	$node_subscriber->poll_query_until('postgres', $synced_query)
+	  or die "Timed out while waiting for subscriber to synchronize data";
+}
+
 ###############################################################################
 # Setup a bidirectional logical replication between Node_A & Node_B
 ###############################################################################
@@ -43,42 +152,19 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)");
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_A FOR TABLE tab_full");
-my $appname_B1 = 'tap_sub_B1';
-$node_B->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B1
-	CONNECTION '$node_A_connstr application_name=$appname_B1'
-	PUBLICATION tap_pub_A
-	WITH (local_only = on)");
+
+create_subscription($node_B, $node_A, $subname_BA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = on, local_only = on');
 
 # node_B (pub) -> node_A (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_B FOR TABLE tab_full");
-my $appname_A = 'tap_sub_A';
-$node_A->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_A
-	CONNECTION '$node_B_connstr application_name=$appname_A'
-	PUBLICATION tap_pub_B
-	WITH (local_only = on, copy_data = off)");
-
-# Wait for subscribers to finish initialization
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_A->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_B->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+create_subscription($node_A, $node_B, $subname_AB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, local_only = on');
 
 is(1, 1, "Circular replication setup is complete");
 
-my $result;
-
 ###############################################################################
 # check that bidirectional logical replication setup does not cause infinite
 # recursive insertion.
@@ -88,19 +174,19 @@ my $result;
 $node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
 $node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 # check that transaction was committed on subscriber(s)
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full;");
-is( $result, qq(11
+is($result, qq(11
 12),
-	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+   'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
 );
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
-is( $result, qq(11
+is($result, qq(11
 12),
-	'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
+   'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
 );
 
 ###############################################################################
@@ -125,30 +211,20 @@ my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
 $node_C->safe_psql('postgres',
 	"CREATE PUBLICATION tap_pub_C FOR TABLE tab_full");
 
-my $appname_B2 = 'tap_sub_B2';
-$node_B->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B2
-	CONNECTION '$node_C_connstr application_name=$appname_B2'
-	PUBLICATION tap_pub_C
-	WITH (local_only = on)");
-
-$node_C->wait_for_catchup($appname_B2);
-
-$node_C->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = on, local_only = on');
 
 # insert a record
 $node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);");
 
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
 
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full;");
-is( $result, qq(11
+is($result, qq(11
 12
 13),
-	'Node_C data replicated to Node_B'
+   'Node_C data replicated to Node_B'
 );
 
 # check that the data published from node_C to node_B is not sent to node_A
@@ -158,6 +234,168 @@ is( $result, qq(11
 	'Remote data originated from other node is not replicated when local_only option is ON'
 );
 
+# clear the operations done by this test
+$node_B->safe_psql(
+       'postgres', "
+        DROP SUBSCRIPTION $subname_BC");
+$node_C->safe_psql(
+	    'postgres', "
+        DELETE FROM tab_full");
+$node_B->safe_psql(
+	    'postgres', "
+        DELETE FROM tab_full where a = 13");
+
+###############################################################################
+# Specifying local_only 'on' which indicates that the publisher should only
+# replicate the changes that are generated locally from node_B, but in
+# this case since the node_B is also subscribing data from node_A, node_B can
+# have data originated from node_A, so throw an error in this case to prevent
+# node_A data being replicated to the node_C.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+       'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A3
+        CONNECTION '$node_B_connstr application_name=$subname_AB'
+        PUBLICATION tap_pub_B
+        WITH (local_only = on, copy_data = on)");
+like(
+       $stderr,
+       qr/ERROR:  CREATE\/ALTER SUBSCRIPTION with local_only and copy_data as true is not allowed when the publisher might have replicated data/,
+       "Create subscription with local_only and copy_data having replicated table in publisher"
+);
+
+# Creating subscription with local_only and copy_data as force should be
+# successful when the publisher has replicated data
+$node_A->safe_psql(
+       'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$subname_AC'
+        PUBLICATION tap_pub_B
+        WITH (local_only = on, copy_data = force)");
+
+$node_A->safe_psql(
+       'postgres', "
+        DROP SUBSCRIPTION tap_sub_A2");
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has pre-existing
+# data and the new node (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(11
+12), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(11
+12), 'Check existing data');
+
+$result =
+	$node_C->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, local_only = on');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, local_only = on');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = force, local_only = on');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, local_only = on');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (23);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (33);");
+
+verify_data($node_A, $node_B, $node_C, '11
+12
+13
+23
+33');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) and the new node
+# (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, local_only = on');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = off, local_only = on');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = off, local_only = on');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, local_only = on');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (21);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);");
+
+verify_data($node_A, $node_B, $node_C, '11
+21
+31');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has no data and
+# the new node (node_C) some pre-existing data.
+###############################################################################
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (31);");
+
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is( $result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab_full order by 1;");
+is($result, qq(31), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = force, local_only = on');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+       'tap_pub_C', 'copy_data = force, local_only = on');
+
+$node_C->safe_psql('postgres',
+       "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');");
+
+$node_C->safe_psql('postgres', "TRUNCATE tab_full");
+
+# include truncates now
+$node_C->safe_psql('postgres',
+       "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');"
+);
+
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+       'tap_pub_A', 'copy_data = force, local_only = on');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+       'tap_pub_B', 'copy_data = off, local_only = on');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);");
+$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (22);");
+$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (32);");
+
+verify_data($node_A, $node_B, $node_C, '12
+22
+31
+32');
+
 # shutdown
 $node_B->stop('fast');
 $node_A->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 87ee7bf866..3edea910de 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -442,6 +442,7 @@ ConvProcInfo
 ConversionLocation
 ConvertRowtypeExpr
 CookedConstraint
+CopyData
 CopyDest
 CopyFormatOptions
 CopyFromState
-- 
2.32.0

Reply via email to