On Thu, Jul 28, 2022 at 11:28 AM Peter Smith <smithpb2...@gmail.com> wrote:
>
> Hi Vignesh.
>
> FYI the v39* patch fails to apply [1]. Can you please rebase it?
>
>
> [1]
> === Applying patches on top of PostgreSQL commit ID
> 5f858dd3bebd1f3845aef2bff7f4345bfb7b74b3 ===
> === applying patch
> ./v39-0001-Check-and-throw-an-error-if-publication-tables-w.patch
> patching file doc/src/sgml/ref/alter_subscription.sgml
> patching file doc/src/sgml/ref/create_subscription.sgml
> patching file src/backend/commands/subscriptioncmds.c
> Hunk #10 FAILED at 886.
> 1 out of 14 hunks FAILED -- saving rejects to file
> src/backend/commands/subscriptioncmds.c.rej
> patching file src/test/regress/expected/subscription.out
> patching file src/test/regress/sql/subscription.sql
> patching file src/test/subscription/t/030_origin.pl
> patching file src/tools/pgindent/typedefs.list
>
> ------

Please find the v40 patch attached which is rebased on top of head.

Regards,
Vignesh
From e59c864801c066aff069deb528427788bd0cff0a Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Thu, 28 Jul 2022 19:11:35 +0530
Subject: [PATCH v40 1/2] Check and throw an error if publication tables were
 also subscribing from other publishers and support force value for copy_data
 parameter.

This patch does a couple of things:
1) Checks and throws an error if 'copy_data = on' and 'origin =
none' but the publication tables were also replicated from other publishers.
2) Adds 'force' value for copy_data parameter.

-------------------------------------------------------------------------------
The steps below help to demonstrate how the new exception is useful:

The initial copy phase has no way to know the origin of the row data,
so if 'copy_data = on' in the step 4 below, then an error will be
thrown to prevent any potentially non-local data from being copied:

e.g.
CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
PUBLICATION pub_node1 WITH (copy_data = on, origin = none);
ERROR:  CREATE/ALTER SUBSCRIPTION with origin = none and copy_data = on is
not allowed when the publisher might have replicated data.

-------------------------------------------------------------------------------
The following steps help to demonstrate how the 'copy_data = force'
change will be useful:

Let's take a scenario where the user wants to set up bidirectional
logical replication between node1 and node2 where the same table on
node1 has pre-existing data and node2 has no pre-existing data.

e.g.
node1: Table t1 (c1 int) has data 11, 12, 13, 14
node2: Table t1 (c1 int) has no pre-existing data

The following steps are required in this case:
step 1:
node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1;
CREATE PUBLICATION

step 2:
node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1;
CREATE PUBLICATION

step 3:
node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '<node2 details>'
node1-# PUBLICATION pub_node2;
CREATE SUBSCRIPTION

step 4:
node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
node2-# PUBLICATION pub_node1;
CREATE SUBSCRIPTION

After the subscription is created on node2, node1 will be synced to
node2 and the newly synced data will be sent to node2. This process of
node1 sending data to node2 and node2 sending data to node1 will repeat
infinitely. If table t1 has a unique key, this will lead to a unique key
violation and replication won't proceed.

This problem can be avoided by using origin and copy_data parameters as given
below:
Step 1 & Step 2 are same as above.

step 3: Create a subscription on node1 to subscribe to node2:
node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '<node2 details>'
node1-# PUBLICATION pub_node2 WITH (copy_data = off, origin = none);
CREATE SUBSCRIPTION

step 4: Create a subscription on node2 to subscribe to node1. Use
'copy_data = force' when creating a subscription to node1 so that the
existing table data is copied during initial sync:
node2=# CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '<node1 details>'
node2-# PUBLICATION pub_node1 WITH (copy_data = force, origin = none);
CREATE SUBSCRIPTION

Author: Vignesh C
Reviewed-By: Peter Smith, Amit Kapila, Jonathan Katz, Shi yu, Wang wei
Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubh...@mail.gmail.com
---
 doc/src/sgml/ref/alter_subscription.sgml   |  14 +-
 doc/src/sgml/ref/create_subscription.sgml  |  32 +-
 src/backend/commands/subscriptioncmds.c    | 215 +++++++++++-
 src/test/regress/expected/subscription.out |  22 +-
 src/test/regress/sql/subscription.sql      |  14 +
 src/test/subscription/t/030_origin.pl      | 382 ++++++++++++++++++---
 src/tools/pgindent/typedefs.list           |   1 +
 7 files changed, 613 insertions(+), 67 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 64efc21f53..f4fb9c5282 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -161,12 +161,20 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
 
       <variablelist>
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
+         </para>
+         <para>
+          Refer to the <xref linkend="sql-createsubscription-notes"/> for the
+          usage of <literal>force</literal> for <literal>copy_data</literal>
+          parameter and its interaction with the <literal>origin</literal>
+          parameter.
          </para>
          <para>
           Previously subscribed tables are not copied, even if a table's row
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 7390c715bc..a2cd3c211c 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -115,7 +115,8 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           (You cannot combine setting <literal>connect</literal>
           to <literal>false</literal> with
           setting <literal>create_slot</literal>, <literal>enabled</literal>,
-          or <literal>copy_data</literal> to <literal>true</literal>.)
+          or <literal>copy_data</literal> to
+          <literal>true</literal>/<literal>force</literal>.)
          </para>
 
          <para>
@@ -201,18 +202,26 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
        </varlistentry>
 
        <varlistentry>
-        <term><literal>copy_data</literal> (<type>boolean</type>)</term>
+        <term><literal>copy_data</literal> (<type>enum</type>)</term>
         <listitem>
          <para>
           Specifies whether to copy pre-existing data in the publications
-          that are being subscribed to when the replication starts.
-          The default is <literal>true</literal>.
+          that are being subscribed to when the replication starts. This
+          parameter may be either <literal>true</literal>,
+          <literal>false</literal> or <literal>force</literal>. The default is
+          <literal>true</literal>.
          </para>
          <para>
           If the publications contain <literal>WHERE</literal> clauses, it
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+         <para>
+          Refer to the <xref linkend="sql-createsubscription-notes"/> for the
+          usage of <literal>force</literal> for <literal>copy_data</literal>
+          parameter and its interaction with the <literal>origin</literal>
+          parameter.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -315,6 +324,12 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           to <literal>any</literal> means that the publisher sends changes
           regardless of their origin. The default is <literal>any</literal>.
          </para>
+         <para>
+          Refer to the <xref linkend="sql-createsubscription-notes"/> for the
+          usage of <literal>force</literal> for <literal>copy_data</literal>
+          parameter and its interaction with the <literal>origin</literal>
+          parameter.
+         </para>
         </listitem>
        </varlistentry>
       </variablelist></para>
@@ -386,6 +401,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    can have non-existent publications.
   </para>
 
+  <para>
+   If the subscription is created with <literal>origin = NONE</literal> and
+   <literal>copy_data = true</literal>, it will check if the publisher has
+   subscribed to the same table from other publishers and, if so, throw an
+   error to prevent possible non-local data from being copied. The user can
+   override this check and continue with the copy operation by specifying
+   <literal>copy_data = force</literal>.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f73dfb6067..7404bb9a56 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -69,6 +69,16 @@
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
 
+/*
+ * Represents whether copy_data parameter is specified with off, on, or force.
+ */
+typedef enum CopyData
+{
+	COPY_DATA_OFF = 0,
+	COPY_DATA_ON,
+	COPY_DATA_FORCE
+} CopyData;
+
 /*
  * Structure to hold a bitmap representing the user-provided CREATE/ALTER
  * SUBSCRIPTION command options and the parsed/default values of each of them.
@@ -81,7 +91,7 @@ typedef struct SubOpts
 	bool		connect;
 	bool		enabled;
 	bool		create_slot;
-	bool		copy_data;
+	CopyData	copy_data;
 	bool		refresh;
 	bool		binary;
 	bool		streaming;
@@ -92,10 +102,68 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_pub_table_subscribed(WalReceiverConn *wrconn,
+									   List *publications, CopyData copydata,
+									   char *origin, Oid *subrel_local_oids,
+									   int subrel_count);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 
+/*
+ * Validate the value specified for copy_data parameter.
+ */
+static CopyData
+defGetCopyData(DefElem *def)
+{
+	/*
+	 * If no parameter value given, assume "true" is meant.
+	 */
+	if (def->arg == NULL)
+		return COPY_DATA_ON;
+
+	/*
+	 * Allow 0, 1, "true", "false", "on", "off" or "force".
+	 */
+	switch (nodeTag(def->arg))
+	{
+		case T_Integer:
+			switch (intVal(def->arg))
+			{
+				case 0:
+					return COPY_DATA_OFF;
+				case 1:
+					return COPY_DATA_ON;
+				default:
+					/* otherwise, error out below */
+					break;
+			}
+			break;
+		default:
+			{
+				char	   *sval = defGetString(def);
+
+				/*
+				 * The set of strings accepted here should match up with the
+				 * grammar's opt_boolean_or_string production.
+				 */
+				if (pg_strcasecmp(sval, "false") == 0 ||
+					pg_strcasecmp(sval, "off") == 0)
+					return COPY_DATA_OFF;
+				if (pg_strcasecmp(sval, "true") == 0 ||
+					pg_strcasecmp(sval, "on") == 0)
+					return COPY_DATA_ON;
+				if (pg_strcasecmp(sval, "force") == 0)
+					return COPY_DATA_FORCE;
+			}
+			break;
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+			errmsg("%s requires a boolean or \"force\"", def->defname));
+	return COPY_DATA_OFF;		/* keep compiler quiet */
+}
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -128,7 +196,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 	if (IsSet(supported_opts, SUBOPT_CREATE_SLOT))
 		opts->create_slot = true;
 	if (IsSet(supported_opts, SUBOPT_COPY_DATA))
-		opts->copy_data = true;
+		opts->copy_data = COPY_DATA_ON;
 	if (IsSet(supported_opts, SUBOPT_REFRESH))
 		opts->refresh = true;
 	if (IsSet(supported_opts, SUBOPT_BINARY))
@@ -196,7 +264,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 				errorConflictingDefElem(defel, pstate);
 
 			opts->specified_opts |= SUBOPT_COPY_DATA;
-			opts->copy_data = defGetBoolean(defel);
+			opts->copy_data = defGetCopyData(defel);
 		}
 		else if (IsSet(supported_opts, SUBOPT_SYNCHRONOUS_COMMIT) &&
 				 strcmp(defel->defname, "synchronous_commit") == 0)
@@ -352,12 +420,12 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("%s and %s are mutually exclusive options",
-							"connect = false", "copy_data = true")));
+							"connect = false", "copy_data = true/force")));
 
 		/* Change the defaults of other options. */
 		opts->enabled = false;
 		opts->create_slot = false;
-		opts->copy_data = false;
+		opts->copy_data = COPY_DATA_OFF;
 	}
 
 	/*
@@ -680,6 +748,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		PG_TRY();
 		{
 			check_publications(wrconn, publications);
+			check_pub_table_subscribed(wrconn, publications, opts.copy_data,
+									   opts.origin, NULL, 0);
 
 			/*
 			 * Set sync state based on if we were asked to do data copy or
@@ -775,7 +845,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 }
 
 static void
-AlterSubscription_refresh(Subscription *sub, bool copy_data,
+AlterSubscription_refresh(Subscription *sub, CopyData copy_data,
 						  List *validate_publications)
 {
 	char	   *err;
@@ -786,6 +856,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 	ListCell   *lc;
 	int			off;
 	int			remove_rel_len;
+	int			subrel_count;
 	Relation	rel = NULL;
 	typedef struct SubRemoveRels
 	{
@@ -815,13 +886,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 		/* Get local table list. */
 		subrel_states = GetSubscriptionRelations(sub->oid, false);
+		subrel_count = list_length(subrel_states);
 
 		/*
 		 * Build qsorted array of local table oids for faster lookup. This can
 		 * potentially contain all tables in the database so speed of lookup
 		 * is important.
 		 */
-		subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+		subrel_local_oids = palloc(subrel_count * sizeof(Oid));
 		off = 0;
 		foreach(lc, subrel_states)
 		{
@@ -829,14 +901,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
 			subrel_local_oids[off++] = relstate->relid;
 		}
-		qsort(subrel_local_oids, list_length(subrel_states),
+		qsort(subrel_local_oids, subrel_count,
 			  sizeof(Oid), oid_cmp);
 
+		/* Check whether we can allow copy of newly added relations. */
+		check_pub_table_subscribed(wrconn, sub->publications, copy_data,
+								   sub->origin, subrel_local_oids,
+								   subrel_count);
+
 		/*
 		 * Rels that we want to remove from subscription and drop any slots
 		 * and origins corresponding to them.
 		 */
-		sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+		sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
 
 		/*
 		 * Walk over the remote tables and try to match them to locally known
@@ -862,7 +939,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			pubrel_local_oids[off++] = relid;
 
 			if (!bsearch(&relid, subrel_local_oids,
-						 list_length(subrel_states), sizeof(Oid), oid_cmp))
+						 subrel_count, sizeof(Oid), oid_cmp))
 			{
 				AddSubscriptionRelState(sub->oid, relid,
 										copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -881,7 +958,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 			  sizeof(Oid), oid_cmp);
 
 		remove_rel_len = 0;
-		for (off = 0; off < list_length(subrel_states); off++)
+		for (off = 0; off < subrel_count; off++)
 		{
 			Oid			relid = subrel_local_oids[off];
 
@@ -1781,6 +1858,122 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
 	table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Check and throw an error if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if "copy_data = on"
+ * and "origin = NONE" for CREATE SUBSCRIPTION and
+ * ALTER SUBSCRIPTION ... REFRESH statements to avoid the publisher from
+ * replicating data that has an origin.
+ *
+ * This check need not be performed on the tables that are already added as
+ * incremental sync for such tables will happen through WAL and the origin of
+ * the data can be identified from the WAL records.
+ *
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber.
+ */
+static void
+check_pub_table_subscribed(WalReceiverConn *wrconn, List *publications,
+						   CopyData copydata, char *origin,
+						   Oid *subrel_local_oids, int subrel_count)
+{
+	WalRcvExecResult *res;
+	StringInfoData cmd;
+	TupleTableSlot *slot;
+	Oid			tableRow[2] = {TEXTOID, TEXTOID};
+
+	if (copydata != COPY_DATA_ON || !origin ||
+		(pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+		return;
+
+	initStringInfo(&cmd);
+	appendStringInfoString(&cmd,
+						   "SELECT DISTINCT N.nspname AS schemaname,\n"
+						   "				C.relname AS tablename\n"
+						   "FROM pg_publication P,\n"
+						   "	 LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+						   "	 LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+						   "	 pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+						   "WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND P.pubname IN (");
+	get_publications_str(publications, &cmd, true);
+	appendStringInfoChar(&cmd, ')');
+
+	res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+	pfree(cmd.data);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not receive list of replicated tables from the publisher: %s",
+						res->err)));
+
+	/* Process tables. */
+	slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+	while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+	{
+		char	   *nspname;
+		char	   *relname;
+		bool		isnull;
+		bool		isnewtable = true;
+
+		nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+		Assert(!isnull);
+		relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull));
+		Assert(!isnull);
+
+		/* Skip already added tables */
+		if (subrel_count)
+		{
+			RangeVar   *rv;
+			Oid			relid;
+
+			rv = makeRangeVar(nspname, relname, -1);
+			relid = RangeVarGetRelid(rv, AccessShareLock, false);
+
+			/* Check for supported relkind. */
+			CheckSubscriptionRelkind(get_rel_relkind(relid),
+									 rv->schemaname, rv->relname);
+
+			if (bsearch(&relid, subrel_local_oids,
+						subrel_count, sizeof(Oid), oid_cmp))
+				isnewtable = false;
+		}
+
+		ExecClearTuple(slot);
+
+		if (!isnewtable)
+		{
+			pfree(nspname);
+			pfree(relname);
+			continue;
+		}
+
+		/*
+		 * Throw an error if the publisher has subscribed to the same table
+		 * from some other publisher. We cannot know the origin of data during
+		 * the initial sync. Data origins can be found only from the WAL by
+		 * looking at the origin id.
+		 *
+		 * XXX: For simplicity, we don't check whether the table has any data
+		 * or not. If the table doesn't have any data then we don't need to
+		 * distinguish between data having origin and data not having origin so
+		 * we can avoid throwing an error in that case.
+		 */
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("could not replicate table \"%s.%s\"",
+					   nspname, relname),
+				errdetail("CREATE/ALTER SUBSCRIPTION with %s and %s is not allowed when the publisher has subscribed same table.",
+						  "origin = none",  "copy_data = true"),
+				errhint("Use CREATE/ALTER SUBSCRIPTION with %s.",
+						"copy_data = false/force"));
+	}
+
+	ExecDropSingleTupleTableSlot(slot);
+
+	walrcv_clear_result(res);
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index ef0ebf96b9..69f4c85834 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -46,8 +46,18 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU
 ERROR:  must be superuser to create subscriptions
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 -- fail - invalid option combinations
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
-ERROR:  connect = false and copy_data = true are mutually exclusive options
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
+ERROR:  connect = false and copy_data = true/force are mutually exclusive options
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 2);
+ERROR:  copy_data requires a boolean or "force"
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 ERROR:  connect = false and enabled = true are mutually exclusive options
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
@@ -93,6 +103,16 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 
 DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 4425fafc46..303b62e754 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -39,7 +39,12 @@ CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PU
 SET SESSION AUTHORIZATION 'regress_subscription_user';
 
 -- fail - invalid option combinations
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = true);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = on);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 1);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = force);
+CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, copy_data = 2);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, enabled = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, create_slot = true);
 CREATE SUBSCRIPTION regress_testsub2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, enabled = true);
@@ -66,6 +71,15 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 DROP SUBSCRIPTION regress_testsub3;
 DROP SUBSCRIPTION regress_testsub4;
 
+-- ok - valid copy_data options
+CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = false);
+CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = off);
+CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, copy_data = 0);
+
+DROP SUBSCRIPTION regress_testsub3;
+DROP SUBSCRIPTION regress_testsub4;
+DROP SUBSCRIPTION regress_testsub5;
+
 -- fail - invalid connection string
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl
index e9241d2996..e992b6ecb0 100644
--- a/src/test/subscription/t/030_origin.pl
+++ b/src/test/subscription/t/030_origin.pl
@@ -1,13 +1,115 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test the CREATE SUBSCRIPTION 'origin' parameter.
+# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
+# 'copy_data' parameter.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+my $result;
+my $stdout;
+my $stderr;
+
+my $subname_AB = 'tap_sub_A_B';
+my $subname_AC = 'tap_sub_A_C';
+my $subname_BA = 'tap_sub_B_A';
+my $subname_BC = 'tap_sub_B_C';
+my $subname_CA = 'tap_sub_C_A';
+my $subname_CB = 'tap_sub_C_B';
+
+# Detach node_C from the node-group of (node_A, node_B, node_C) and clean the
+# table contents from all nodes.
+sub detach_node_clean_table_data
+{
+	my ($node_A, $node_B, $node_C) = @_;
+	$node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C");
+	$node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A");
+	$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B");
+
+	$result =
+	  $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(1), 'check subscription was dropped on subscriber');
+
+	$result =
+	  $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription");
+	is($result, qq(0), 'check subscription was dropped on subscriber');
+
+	$result = $node_A->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_B->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(1), 'check replication slot was dropped on publisher');
+
+	$result = $node_C->safe_psql('postgres',
+		"SELECT count(*) FROM pg_replication_slots");
+	is($result, qq(0), 'check replication slot was dropped on publisher');
+
+	$node_A->safe_psql('postgres', "TRUNCATE tab");
+	$node_B->safe_psql('postgres', "TRUNCATE tab");
+	$node_C->safe_psql('postgres', "TRUNCATE tab");
+}
+
+# Subroutine to verify the data is replicated successfully.
+sub verify_data
+{
+	my ($node_A, $node_B, $node_C, $expect) = @_;
+
+	$node_A->wait_for_catchup($subname_BA);
+	$node_A->wait_for_catchup($subname_CA);
+	$node_B->wait_for_catchup($subname_AB);
+	$node_B->wait_for_catchup($subname_CB);
+	$node_C->wait_for_catchup($subname_AC);
+	$node_C->wait_for_catchup($subname_BC);
+
+	# check that data is replicated to all the nodes
+	$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+	is($result, qq($expect), 'Data is replicated as expected');
+
+	$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+	is($result, qq($expect), 'Data is replicated as expected');
+
+	$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+	is($result, qq($expect), 'Data is replicated as expected');
+}
+
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+
+# Subroutine to create subscription and wait until the initial sync is
+# completed. Subroutine expects subscriber node, publisher node, subscription
+# name, destination connection string, publication name and the subscription
+# parameters to be passed as input parameters.
+sub create_subscription
+{
+	my ($node_subscriber, $node_publisher, $sub_name, $node_connstr,
+		$pub_name, $sub_params)
+	  = @_;
+
+	# Application_name is always assigned the same value as the subscription
+	# name.
+	$node_subscriber->safe_psql(
+		'postgres', "
+                CREATE SUBSCRIPTION $sub_name
+                CONNECTION '$node_connstr application_name=$sub_name'
+                PUBLICATION $pub_name
+                WITH ($sub_params)");
+	$node_publisher->wait_for_catchup($sub_name);
+
+	# also wait for initial table sync to finish
+	$node_subscriber->poll_query_until('postgres', $synced_query)
+	  or die "Timed out while waiting for subscriber to synchronize data";
+}
+
 ###############################################################################
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
@@ -32,41 +134,17 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_A (pub) -> node_B (sub)
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
-my $appname_B1 = 'tap_sub_B1';
-$node_B->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B1
-	CONNECTION '$node_A_connstr application_name=$appname_B1'
-	PUBLICATION tap_pub_A
-	WITH (origin = none)");
+create_subscription($node_B, $node_A, $subname_BA, $node_A_connstr,
+	'tap_pub_A', 'copy_data = on, origin = none');
 
 # node_B (pub) -> node_A (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
-my $appname_A = 'tap_sub_A';
-$node_A->safe_psql(
-	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_A
-	CONNECTION '$node_B_connstr application_name=$appname_A'
-	PUBLICATION tap_pub_B
-	WITH (origin = none, copy_data = off)");
-
-# Wait for subscribers to finish initialization
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
-
-# Also wait for initial table sync to finish
-my $synced_query =
-  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
-$node_A->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
-$node_B->poll_query_until('postgres', $synced_query)
-  or die "Timed out while waiting for subscriber to synchronize data";
+create_subscription($node_A, $node_B, $subname_AB, $node_B_connstr,
+	'tap_pub_B', 'copy_data = off, origin = none');
 
 is(1, 1, 'Bidirectional replication setup is complete');
 
-my $result;
-
 ###############################################################################
 # Check that bidirectional logical replication setup does not cause infinite
 # recursive insertion.
@@ -76,8 +154,8 @@ my $result;
 $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
 $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 # check that transaction was committed on subscriber(s)
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
@@ -93,8 +171,8 @@ is( $result, qq(11
 
 $node_A->safe_psql('postgres', "DELETE FROM tab;");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 ###############################################################################
 # Check that remote data of node_B (that originated from node_C) is not
@@ -117,36 +195,244 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_C (pub) -> node_B (sub)
 my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
 $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = on, origin = none');
+
+# insert a record
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
+$node_A->wait_for_catchup($subname_BA);
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(32), 'The node_C data replicated to node_B');
+
+# check that the data published from node_C to node_B is not sent to node_A
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(),
+	'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
+);
+
+# clear the operations done by this test
+$node_B->safe_psql(
+	'postgres', "
+        DROP SUBSCRIPTION $subname_BC");
+# no need to wait for catchup of delete operation performed in node_C as
+# the subscription for node_C publication has been dropped
+$node_C->safe_psql(
+	'postgres', "
+        DELETE FROM tab");
+
+# wait for catchup of bidirectional logical replication nodes node_A & node_B
+$node_B->safe_psql(
+	'postgres', "
+        DELETE FROM tab where a = 32");
+
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
+
+###############################################################################
+# Specify origin as 'none' which indicates that the publisher should only
+# replicate the changes that are generated locally from node_B, but in
+# this case since the node_B is also subscribing data from node_A, node_B can
+# have remotely originated data from node_A. We throw an error, in this case,
+# to draw attention to there being possible remote data.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+	'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$subname_AB'
+        PUBLICATION tap_pub_B
+        WITH (origin = none, copy_data = on)");
+like(
+	$stderr,
+	qr/ERROR: ( [A-Z0-9]+:)? could not replicate table "public.tab"/,
+	"Create subscription with origin = none and copy_data when the publisher has subscribed same table"
+);
+
+# Creating subscription with origin as none and copy_data as force should be
+# successful when the publisher has replicated data
+$node_A->safe_psql(
+	'postgres', "
+        CREATE SUBSCRIPTION tap_sub_A2
+        CONNECTION '$node_B_connstr application_name=$subname_AC'
+        PUBLICATION tap_pub_B
+        WITH (origin = none, copy_data = force)");
+
+$node_B->wait_for_catchup($subname_AC);
+
+# also wait for initial table sync to finish
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Alter subscription ... refresh publication should be successful when no new
+# table is added
+$node_A->safe_psql(
+	'postgres', "
+        ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION");
+
+# Check Alter subscription ... refresh publication when there is a new
+# table that is subscribing data from a different publication
+$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+
+# add a new table to the publication
+$node_A->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
+$node_B->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");
 
-my $appname_B2 = 'tap_sub_B2';
 $node_B->safe_psql(
 	'postgres', "
-	CREATE SUBSCRIPTION tap_sub_B2
-	CONNECTION '$node_C_connstr application_name=$appname_B2'
-	PUBLICATION tap_pub_C
-	WITH (origin = none)");
+        ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
 
-$node_C->wait_for_catchup($appname_B2);
+$node_B->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Alter subscription ... refresh publication should fail when a new table in
+# the publisher is subscribing data from a different publication
+($result, $stdout, $stderr) = $node_A->psql(
+	'postgres', "
+        ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION");
+like(
+	$stderr,
+	qr/ERROR: ( [A-Z0-9]+:)? could not replicate table "public.tab_new"/,
+	"Refresh publication when the publisher has subscribed for the new table"
+);
 
+# clear the operations done by this test
+$node_A->safe_psql('postgres', "DROP TABLE tab_new");
+$node_B->safe_psql('postgres', "DROP TABLE tab_new");
+$node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A2");
+$node_A->safe_psql(
+	'postgres', "
+        ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION");
+$node_A->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+$node_B->safe_psql(
+	'postgres', "
+        ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
 $node_B->poll_query_until('postgres', $synced_query)
   or die "Timed out while waiting for subscriber to synchronize data";
 
-# insert a record
-$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has pre-existing
+# data and the new node (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = off, origin = none');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = off, origin = none');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+	'tap_pub_A', 'copy_data = force, origin = none');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+	'tap_pub_B', 'copy_data = off, origin = none');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (13);");
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (23);");
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (33);");
+
+verify_data(
+	$node_A, $node_B, $node_C, '13
+23
+33');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
 
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
-$node_A->wait_for_catchup($appname_B1);
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) and the new node
+# (node_C) does not have any data.
+###############################################################################
+$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
 
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
-is($result, qq(32), 'The node_C data replicated to node_B');
+is($result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = off, origin = none');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = off, origin = none');
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+	'tap_pub_A', 'copy_data = off, origin = none');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+	'tap_pub_B', 'copy_data = off, origin = none');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (14);");
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (24);");
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (34);");
+
+verify_data(
+	$node_A, $node_B, $node_C, '14
+24
+34');
+
+detach_node_clean_table_data($node_A, $node_B, $node_C);
+
+###############################################################################
+# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional
+# replication setup when the existing nodes (node_A & node_B) has no data and
+# the new node (node_C) some pre-existing data.
+###############################################################################
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (35);");
 
-# check that the data published from node_C to node_B is not sent to node_A
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
-is($result, qq(),
-	'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
+is($result, qq(), 'Check existing data');
+
+$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(), 'Check existing data');
+
+$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
+is($result, qq(35), 'Check existing data');
+
+create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = on, origin = none');
+create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr,
+	'tap_pub_C', 'copy_data = on, origin = none');
+
+$node_C->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');");
+
+$node_C->safe_psql('postgres', "TRUNCATE tab");
+
+# include truncates now
+$node_C->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');"
 );
 
+create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr,
+	'tap_pub_A', 'copy_data = force, origin = none');
+create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr,
+	'tap_pub_B', 'copy_data = off, origin = none');
+
+# insert some data in all the nodes
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (16);");
+$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (26);");
+$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (36);");
+
+verify_data(
+	$node_A, $node_B, $node_C, '16
+26
+35
+36');
+
 # shutdown
 $node_B->stop('fast');
 $node_A->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 35c9f1efce..32ca5483d0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -456,6 +456,7 @@ ConvProcInfo
 ConversionLocation
 ConvertRowtypeExpr
 CookedConstraint
+CopyData
 CopyDest
 CopyFormatOptions
 CopyFromState
-- 
2.32.0

From 222baacf5babdb6b3521822e6f8bdbace094519b Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Mon, 27 Jun 2022 18:44:18 +0530
Subject: [PATCH v40 2/2] Document the steps for replication between primaries
 in various scenarios.

Document the steps for the following:
a) Setting replication between two primaries.
b) Adding a new node when there is no table data on any of the nodes.
c) Adding a new node when table data is present on the existing nodes.
d) Generic steps for adding a new node to an existing primaries.

Author: Vignesh C
Reviewed-By: Peter Smith, Amit Kapila, Shi yu, Jonathan Katz, Wang wei
Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubh...@mail.gmail.com
---
 doc/src/sgml/logical-replication.sgml     | 440 ++++++++++++++++++++++
 doc/src/sgml/ref/create_subscription.sgml |   5 +-
 2 files changed, 444 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index bdf1e7b727..8c971e0220 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -514,6 +514,446 @@ test_sub=# SELECT * FROM t3;
 
  </sect1>
 
+ <sect1 id="replication-between-primaries">
+  <title>Replication between primaries</title>
+
+   <para>
+    Replication between primaries is useful for creating a multi-master
+    database environment for replicating write operations performed by any of
+    the member primaries. The steps to create replication between primaries in
+    various scenarios are given below.
+   </para>
+
+   <note>
+    <para>
+     The user is responsible for designing their schemas in a way to minimize
+     the risk of conflicts. See <xref linkend="logical-replication-conflicts"/>
+     for the details of logical replication conflicts. The logical replication
+     restrictions apply to the replication between primaries also. See
+     <xref linkend="logical-replication-restrictions"/> for the details of
+     logical replication restrictions.
+    </para>
+   </note>
+
+   <warning>
+    <para>
+     Setting up replication between primaries requires multiple steps to be
+     performed on various primaries. Because not all operations are
+     transactional, the user is advised to take backups. Backups can be taken
+     as described in <xref linkend="backup-base-backup"/>.
+    </para>
+   </warning>
+
+  <sect2 id="setting-replication-between-two-primaries">
+   <title>Setting replication between two primaries</title>
+   <para>
+    The following steps demonstrate how to set up replication between two
+    primaries (<literal>primary1</literal> and <literal>primary2</literal>)
+    when there is no table data present on both primaries:
+   </para>
+
+   <para>
+    Create a publication on <literal>primary1</literal>:
+<programlisting>
+primary1=# CREATE PUBLICATION pub_primary1 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Create a publication on <literal>primary2</literal>:
+<programlisting>
+primary2=# CREATE PUBLICATION pub_primary2 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock the table <literal>t1</literal> on <literal>primary1</literal> and
+    <literal>primary2</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed.
+   </para>
+
+   <para>
+    Create a subscription on <literal>primary2</literal> to subscribe to
+    <literal>primary1</literal>:
+<programlisting>
+primary2=# CREATE SUBSCRIPTION sub_primary2_primary1
+primary2-# CONNECTION 'dbname=foo host=primary1 user=repuser'
+primary2-# PUBLICATION pub_primary1
+primary2-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary1</literal> to subscribe to
+    <literal>primary2</literal>:
+<programlisting>
+primary1=# CREATE SUBSCRIPTION sub_primary1_primary2
+primary1-# CONNECTION 'dbname=foo host=primary2 user=repuser'
+primary1-# PUBLICATION pub_primary2
+primary1-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the replication setup between primaries <literal>primary1</literal> and
+    <literal>primary2</literal> is complete. Any incremental changes from
+    <literal>primary1</literal> will be replicated to
+    <literal>primary2</literal>, and any incremental changes from
+    <literal>primary2</literal> will be replicated to
+    <literal>primary1</literal>.
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-primary">
+   <title>Adding a new primary when there is no table data on any of the primaries</title>
+   <para>
+    The following steps demonstrate adding a new primary
+    (<literal>primary3</literal>) to the existing primaries
+    (<literal>primary1</literal> and <literal>primary2</literal>) when there is
+    no <literal>t1</literal> data on any of the primaries. This requires
+    creating subscriptions on <literal>primary1</literal> and
+    <literal>primary2</literal> to replicate the data from
+    <literal>primary3</literal> and creating subscriptions on
+    <literal>primary3</literal> to replicate data from
+    <literal>primary1</literal> and <literal>primary2</literal>. Note: These
+    steps assume that the replication between the primaries
+    <literal>primary1</literal> and <literal>primary2</literal> is already
+    completed.
+   </para>
+
+   <para>
+    Create a publication on <literal>primary3</literal>:
+<programlisting>
+primary3=# CREATE PUBLICATION pub_primary3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock table <literal>t1</literal> on all the primaries
+    <literal>primary1</literal>, <literal>primary2</literal> and
+    <literal>primary3</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed.
+   </para>
+
+   <para>
+    Create a subscription on <literal>primary1</literal> to subscribe to
+    <literal>primary3</literal>:
+<programlisting>
+primary1=# CREATE SUBSCRIPTION sub_primary1_primary3
+primary1-# CONNECTION 'dbname=foo host=primary3 user=repuser'
+primary1-# PUBLICATION pub_primary3
+primary1-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary2</literal> to subscribe to
+    <literal>primary3</literal>:
+<programlisting>
+primary2=# CREATE SUBSCRIPTION sub_primary2_primary3
+primary2-# CONNECTION 'dbname=foo host=primary3 user=repuser'
+primary2-# PUBLICATION pub_primary3
+primary2-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary3</literal> to subscribe to
+    <literal>primary1</literal>:
+<programlisting>
+primary3=# CREATE SUBSCRIPTION sub_primary3_primary1
+primary3-# CONNECTION 'dbname=foo host=primary1 user=repuser'
+primary3-# PUBLICATION pub_primary1
+primary3-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary3</literal> to subscribe to
+    <literal>primary2</literal>:
+<programlisting>
+primary3=# CREATE SUBSCRIPTION sub_primary3_primary2
+primary3-# CONNECTION 'dbname=foo host=primary2 user=repuser'
+primary3-# PUBLICATION pub_primary2
+primary3-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the replication setup between primaries <literal>primary1</literal>,
+    <literal>primary2</literal> and <literal>primary3</literal> is complete.
+    Incremental changes made on any primary will be replicated to the other two
+    primaries.
+   </para>
+  </sect2>
+
+  <sect2 id="add-new-primary-data-on-existing-primary">
+   <title>Adding a new primary when table data is present on the existing primaries</title>
+    <para>
+     The following steps demonstrate adding a new primary
+     (<literal>primary3</literal>)
+     that has no <literal>t1</literal> data to the existing primaries
+     (<literal>primary1</literal> and <literal>primary2</literal>) where
+     <literal>t1</literal> data is present. This needs similar steps; the only
+     change required here is that <literal>primary3</literal> should create a
+     subscription with <literal>copy_data = force</literal> to one of the
+     existing primaries so it can receive the existing <literal>t1</literal>
+     data during initial data synchronization. Note: These steps assume that
+     the replication between the primaries <literal>primary1</literal> and
+     <literal>primary2</literal> is already completed, and the pre-existing
+     data in table <literal>t1</literal> is already synchronized on both those
+     primaries.
+   </para>
+
+   <para>
+    Create a publication on <literal>primary3</literal>:
+<programlisting>
+primary3=# CREATE PUBLICATION pub_primary3 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Lock table <literal>t1</literal> on <literal>primary2</literal> and
+    <literal>primary3</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed. There is no need to lock table <literal>t1</literal> on
+    <literal>primary1</literal> because any data changes made will be
+    synchronized while creating the subscription with
+    <literal>copy_data = force</literal>.
+   </para>
+
+   <para>
+    Create a subscription on <literal>primary1</literal> to subscribe to
+    <literal>primary3</literal>:
+<programlisting>
+primary1=# CREATE SUBSCRIPTION sub_primary1_primary3
+primary1-# CONNECTION 'dbname=foo host=primary3 user=repuser'
+primary1-# PUBLICATION pub_primary3
+primary1-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary2</literal> to subscribe to
+    <literal>primary3</literal>:
+<programlisting>
+primary2=# CREATE SUBSCRIPTION sub_primary2_primary3
+primary2-# CONNECTION 'dbname=foo host=primary3 user=repuser'
+primary2-# PUBLICATION pub_primary3
+primary2-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary3</literal> to subscribe to
+    <literal>primary1</literal>. Use <literal>copy_data = force </literal> so
+    that the existing table data is copied during initial sync:
+<programlisting>
+primary3=# CREATE SUBSCRIPTION sub_primary3_primary1
+primary3-# CONNECTION 'dbname=foo host=primary1 user=repuser'
+primary3-# PUBLICATION pub_primary1
+primary3-# WITH (copy_data = force, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary3</literal> to subscribe to
+    <literal>primary2</literal>. Use <literal>copy_data = false</literal>
+    because the initial table data would have been
+    already copied in the previous step:
+<programlisting>
+primary3=# CREATE SUBSCRIPTION sub_primary3_primary2
+primary3-# CONNECTION 'dbname=foo host=primary2 user=repuser'
+primary3-# PUBLICATION pub_primary2
+primary3-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the replication setup between primaries <literal>primary1</literal>,
+    <literal>primary2</literal> and <literal>primary3</literal> is complete.
+    Incremental changes made on any primary will be replicated to the other two
+    primaries.
+   </para>
+  </sect2>
+
+  <sect2 id="generic-steps-add-new-primary">
+   <title>Generic steps for adding a new primary to an existing set of primaries</title>
+   <para>
+    Step-1: Create a publication on the new primary.
+   </para>
+   <para>
+    Step-2: Lock the required tables of the new primary in
+    <literal>EXCLUSIVE</literal> mode until the setup is complete. (This lock
+    is necessary to prevent any modifications from happening on the new
+    primary. If data modifications occurred after Step-3, there is a chance
+    they could be published to the first primary and then synchronized back to
+    the new primary while creating the subscription in Step-5. This would
+    result in inconsistent data).
+   </para>
+   <para>
+    Step-3. Create subscriptions on existing primaries to the publication on
+    the new primary with <literal>origin = none</literal> and
+    <literal>copy_data = false</literal>. (The
+    <literal>copy_data = false</literal> is OK here because it is asserted that
+    the published tables of the new primary will have no pre-existing data).
+   </para>
+   <para>
+    Step-4. Lock the required tables of the existing primaries except the first
+    primary in <literal>EXCLUSIVE</literal> mode until the setup is complete.
+    (This lock is necessary to prevent any modifications from happening. If
+    data modifications occur, there is a chance that modifications done between
+    Step-5 and Step-6 will not be synchronized to the new primary. This would
+    result in inconsistent data. There is no need to lock the required tables
+    on the first primary because any data changes made will be synchronized
+    while creating the subscription with <literal>copy_data = force</literal>).
+   </para>
+   <para>
+    Step-5. Create a subscription on the new primary to the publication on the
+    first primary with <literal>origin = none</literal> and
+    <literal>copy_data = force</literal>. (This will copy the same table data
+    from the existing primaries to the new primary).
+   </para>
+   <para>
+    Step-6. Create subscriptions on the new primary to publications on the
+    remaining primaries with <literal>origin = none</literal> and
+    <literal>copy_data = false</literal>. (The copy_data = false is OK here
+    because the existing primary data was already copied to the new primary in
+    Step-5).
+   </para>
+
+   <para>
+    Let's see an example using the above steps for adding a new primary
+    (<literal>primary4</literal>) to the existing primaries
+    (<literal>primary1</literal>, <literal>primary2</literal> and
+    <literal>primary3</literal>). Note: These steps assume that the replication
+    between the primaries (<literal>primary1</literal>,
+    <literal>primary2</literal> and <literal>primary3</literal>) is already
+    completed, and the pre-existing data in table <literal>t1</literal> is
+    already synchronized on the primaries.
+   </para>
+
+    <para>
+    Step-1. Create a publication on <literal>primary4</literal>:
+<programlisting>
+primary4=# CREATE PUBLICATION pub_primary4 FOR TABLE t1;
+CREATE PUBLICATION
+</programlisting></para>
+
+   <para>
+    Step-2. Lock table <literal>t1</literal> on <literal>primary4</literal> in
+    <literal>EXCLUSIVE</literal> mode until the setup is completed.
+   </para>
+
+   <para>
+    Step-3. Create subscriptions on existing primaries to the publication on
+    the new primary with <literal>origin = none</literal> and
+    <literal>copy_data = false</literal>.
+   </para>
+
+   <para>
+    Create a subscription on <literal>primary1</literal> to subscribe to
+    <literal>primary4</literal>:
+<programlisting>
+primary1=# CREATE SUBSCRIPTION sub_primary1_primary4
+primary1-# CONNECTION 'dbname=foo host=primary4 user=repuser'
+primary1-# PUBLICATION pub_primary4
+primary1-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary2</literal> to subscribe to
+    <literal>primary4</literal>:
+<programlisting>
+primary2=# CREATE SUBSCRIPTION sub_primary2_primary4
+primary2-# CONNECTION 'dbname=foo host=primary4 user=repuser'
+primary2-# PUBLICATION pub_primary4
+primary2-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary3</literal> to subscribe to
+    <literal>primary4</literal>:
+<programlisting>
+primary3=# CREATE SUBSCRIPTION sub_primary3_primary4
+primary3-# CONNECTION 'dbname=foo host=primary4 user=repuser'
+primary3-# PUBLICATION pub_primary4
+primary3-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Step-4. Lock table <literal>t1</literal> on <literal>primary2</literal> and
+    <literal>primary3</literal> in <literal>EXCLUSIVE</literal> mode until the
+    setup is completed. There is no need to lock table <literal>t1</literal> on
+    <literal>primary1</literal> because any data changes made will be
+    synchronized while creating the subscription with
+    <literal>copy_data = force</literal>.
+   </para>
+
+   <para>
+    Step-5. Create a subscription on the new primary to the publication on the
+    first primary with <literal>origin = none</literal> and
+    <literal>copy_data = force</literal>.
+   </para>
+
+   <para>
+    Create a subscription on <literal>primary4</literal> to subscribe to
+    <literal>primary1</literal>. Use <literal>copy_data = force </literal> so
+    that the existing table data is copied during initial sync:
+<programlisting>
+primary4=# CREATE SUBSCRIPTION sub_primary4_primary1
+primary4-# CONNECTION 'dbname=foo host=primary1 user=repuser'
+primary4-# PUBLICATION pub_primary1
+primary4-# WITH (copy_data = force, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary4</literal> to subscribe to
+    <literal>primary2</literal>. Use <literal>copy_data = false</literal>
+    because the initial table data would have been
+    already copied in the previous step:
+<programlisting>
+primary4=# CREATE SUBSCRIPTION sub_primary4_primary2
+primary4-# CONNECTION 'dbname=foo host=primary2 user=repuser'
+primary4-# PUBLICATION pub_primary2
+primary4-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Create a subscription on <literal>primary4</literal> to subscribe to
+    <literal>primary3</literal>. Use <literal>copy_data = false</literal>
+    because the initial table data would have been
+    already copied in the previous step:
+<programlisting>
+primary4=# CREATE SUBSCRIPTION sub_primary4_primary3
+primary4-# CONNECTION 'dbname=foo host=primary3 user=repuser'
+primary4-# PUBLICATION pub_primary3
+primary4-# WITH (copy_data = false, origin = none);
+CREATE SUBSCRIPTION
+</programlisting></para>
+
+   <para>
+    Now the replication setup between primaries <literal>primary1</literal>,
+    <literal>primary2</literal>, <literal>primary3</literal> and
+    <literal>primary4</literal> is complete. Incremental changes made on any
+    primary will be replicated to the other three primaries.
+   </para>
+  </sect2>
+
+  <sect2 id="add-primary-data-present-on-new-primary">
+   <title>Adding a new primary that has existing table data</title>
+   <note>
+    <para>
+     Adding a new primary that has existing table data is not supported.
+    </para>
+   </note>
+  </sect2>
+ </sect1>
+
  <sect1 id="logical-replication-row-filter">
   <title>Row Filters</title>
 
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index a2cd3c211c..31b89e8dc4 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -407,7 +407,10 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    subscribed to the same table from other publishers and, if so, throw an
    error to prevent possible non-local data from being copied. The user can
    override this check and continue with the copy operation by specifying
-   <literal>copy_data = force</literal>.
+   <literal>copy_data = force</literal>. Refer to
+   <xref linkend="replication-between-primaries"/> for how
+   <literal>copy_data</literal> and <literal>origin</literal> can be used to
+   set up replication between primaries.
   </para>
 
  </refsect1>
-- 
2.32.0

Reply via email to