On Thu, 27 Apr 2023 at 13:18, Hayato Kuroda (Fujitsu)
<kuroda.hay...@fujitsu.com> wrote:
>
> Dear Julien,
>
> Thank you for updating the patch! Followings are my comments.
>
> 01. documentation
>
> In this page steps to upgrade server with pg_upgrade is aligned. Should we 
> write
> down about subscriber? IIUC, it is sufficient to just add to "Run pg_upgrade",
> like "Apart from streaming replication standby, subscriber node can be upgrade
> via pg_upgrade. At that time we strongly recommend to use 
> --preserve-subscription-state".

Now this option has been removed and made default

> 02. AlterSubscription
>
> I agreed that oid must be preserved between nodes, but I'm still afraid that
> given oid is unconditionally trusted and added to pg_subscription_rel.
> I think we can check the existenec of the relation via SearchSysCache1(RELOID,
> ObjectIdGetDatum(relid)). Of cource the check is optional, so it should be
> executed only when USE_ASSERT_CHECKING is on. Thought?

Modified

> 03. main
>
> Currently --preserve-subscription-state and --no-subscriptions can be used
> together, but the situation is quite unnatural. Shouldn't we exclude them?

This option is removed now, so this scenario will not happen

> 04. getSubscriptionTables
>
>
> ```
> +       SubRelInfo *rels = NULL;
> ```
>
> The variable is used only inside the loop, so the definition should be also 
> moved.

This logic is changed slightly, so it needs to be kept outside

> 05. getSubscriptionTables
>
> ```
> +                       nrels = atooid(PQgetvalue(res, i, i_nrels));
> ```
>
> atoi() should be used instead of atooid().

Modified

> 06. getSubscriptionTables
>
> ```
> +                       subinfo = findSubscriptionByOid(cur_srsubid);
> +
> +                       nrels = atooid(PQgetvalue(res, i, i_nrels));
> +                       rels = pg_malloc(nrels * sizeof(SubRelInfo));
> +
> +                       subinfo->subrels = rels;
> +                       subinfo->nrels = nrels;
> ```
>
> Maybe it never occurs, but findSubscriptionByOid() can return NULL. At that 
> time
> accesses to their attributes will lead the Segfault. Some handling is needed.

This should not happen, added a fatal error in this case.

> 07. dumpSubscription
>
> Hmm, SubRelInfos are still dumped at the dumpSubscription(). I think this 
> style
> breaks the manner of pg_dump. I think another dump function is needed. Please
> see dumpPublicationTable() and dumpPublicationNamespace(). If you have a 
> reason
> to use the style, some comments to describe it is needed.

Modified

> 08. _SubRelInfo
>
> If you will address above comment, DumpableObject must be added as new 
> attribute.

Modified

> 09. check_for_subscription_state
>
> ```
> +                       for (int i = 0; i < ntup; i++)
> +                       {
> +                               is_error = true;
> +                               pg_log(PG_WARNING,
> +                                          "\nWARNING:  subscription \"%s\" 
> has an invalid remote_lsn",
> +                                          PQgetvalue(res, 0, 0));
> +                       }
> ```
>
> The second argument should be i to report the name of subscription more than 
> 2.

Modified

> 10. 003_subscription.pl
>
> ```
> $old_sub->wait_for_subscription_sync($publisher, 'sub');
>
> my $result = $old_sub->safe_psql('postgres',
>     "SELECT COUNT(*) FROM pg_subscription_rel WHERE srsubstate != 'r'");
> is ($result, qq(0), "All tables in pg_subscription_rel should be in ready 
> state");
> ```
>
> I think there is a possibility to cause a timing issue, because the SELECT may
> be executed before srsubstate is changed from 's' to 'r'. Maybe 
> poll_query_until()
> can be used instead.

Modified

> 11. 003_subscription.pl
>
> ```
> command_ok(
>         [
>                 'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
>                 '-D',         $new_sub->data_dir, '-b', $bindir,
>                 '-B',         $bindir,            '-s', $new_sub->host,
>                 '-p',         $old_sub->port,     '-P', $new_sub->port,
>                 $mode,
>                 '--preserve-subscription-state',
>                 '--check',
>         ],
>         'run of pg_upgrade --check for old instance with correct sub rel');
> ```
>
> Missing check of pg_upgrade_output.d?

Modified

> And maybe you missed to run pgperltidy.

It has been run for the new patch.

The attached v7 patch has the changes for the same.

Regards,
Vignesh
From 4660c0914b8c3aef92461b84d7170ffc11bf5dd9 Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Thu, 7 Sep 2023 11:37:36 +0530
Subject: [PATCH v7] Preserve the full subscription's state during pg_upgrade

Previously, only the subscription metadata information was preserved.  Without
the list of relations and their state it's impossible to re-enable the
subscriptions without missing some records as the list of relations can only be
refreshed after enabling the subscription (and therefore starting the apply
worker).  Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relations are new on the publication
side, and therefore should be fully synced, and which shouldn't.

Similarly, the subscription's replication origin are needed to ensure
that we don't replicate anything twice.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to
restore the content of pg_subscription_rel from the old cluster by using
binary_upgrade_create_sub_rel_state SQL function, and also provides an
additional LSN parameter for CREATE SUBSCRIPTION to restore the underlying
replication origin remote LSN.  The new binary_upgrade_create_sub_rel_state
SQL function and the new LSN parameter are not exposed to users and only
accepted in binary upgrade mode.

The new SQL binary_upgrade_create_sub_rel_state function has the following
syntax:
SELECT binary_upgrade_create_sub_rel_state(subname text, relid oid, state char [,sublsn pg_lsn])

In the above, subname is the subscription name, relid is the relation
identifier, the state is the state of the relation, sublsn is optional, and
defaults to NULL/InvalidXLogRecPtr if not provided. pg_dump will retrieve these
values(subname, relid, state and sublsn) from the old cluster.

For now, pg_upgrade will check that all the subscription have a valid
replication origin remote_lsn, and that all underlying relations are in
'r' (ready) state, and will error out if that's not the case, logging the
reason for the failure.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 doc/src/sgml/ref/pgupgrade.sgml          |   7 +
 src/backend/catalog/pg_subscription.c    | 127 +++++++++++++
 src/bin/pg_dump/common.c                 |  22 +++
 src/bin/pg_dump/pg_dump.c                | 177 ++++++++++++++++-
 src/bin/pg_dump/pg_dump.h                |  18 +-
 src/bin/pg_dump/pg_dump_sort.c           |  11 +-
 src/bin/pg_upgrade/check.c               |  80 ++++++++
 src/bin/pg_upgrade/meson.build           |   1 +
 src/bin/pg_upgrade/t/003_subscription.pl | 230 +++++++++++++++++++++++
 src/include/catalog/pg_proc.dat          |  14 ++
 src/tools/pgindent/typedefs.list         |   1 +
 11 files changed, 683 insertions(+), 5 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_subscription.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index bea0d1b93f..cf9b6a4044 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -856,6 +856,13 @@ psql --username=postgres --file=script.sql postgres
    (<type>regclass</type>, <type>regrole</type>, and <type>regtype</type> can be upgraded.)
   </para>
 
+  <para>
+   For upgradation of the subscriptions, all the subscriptions on the old
+   cluster must have a valid <varname>remote_lsn</varname>, and all the
+   subscription tables should be in <literal>r</literal> (ready) state, or else
+   the <application>pg_upgrade</application> run will error.
+  </para>
+
   <para>
    If you want to use link mode and you do not want your old cluster
    to be modified when the new cluster is started, consider using the clone mode.
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce28..64e26dc291 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,8 @@
 #include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -269,6 +271,131 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
 	table_close(rel, NoLock);
 }
 
+/*
+ * binary_upgrade_create_sub_rel_state
+ *
+ * Add the relation with the specified relation state to pg_subscription_rel
+ * table.
+ */
+Datum
+binary_upgrade_create_sub_rel_state(PG_FUNCTION_ARGS)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription form;
+	char	   *subname;
+	Oid			relid;
+	char		relstate;
+	XLogRecPtr	sublsn;
+
+	if (!IsBinaryUpgrade)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("binary_upgrade_create_sub_rel_state can only be called when server is in binary upgrade mode"));
+
+	/* We must check these things before dereferencing the arguments */
+	if (PG_ARGISNULL(0) ||
+		PG_ARGISNULL(1) ||
+		PG_ARGISNULL(2))
+		elog(ERROR, "null argument to binary_upgrade_create_sub_rel_state is not allowed");
+
+	subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	relid = PG_GETARG_OID(1);
+	relstate = PG_GETARG_CHAR(2);
+
+	if (PG_ARGISNULL(3))
+		sublsn = InvalidXLogRecPtr;
+	else
+		sublsn = PG_GETARG_LSN(3);
+
+	if (!OidIsValid(relid))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("invalid relation identifier used: %u", relid));
+
+	tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("relation %u does not exist", relid));
+	ReleaseSysCache(tup);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("subscription \"%s\" does not exist", subname));
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = form->oid;
+
+	AddSubscriptionRelState(subid, relid, relstate, sublsn);
+
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * binary_upgrade_sub_replication_origin_advance
+ *
+ * Update the remote_lsn for the subscriber's replication origin.
+ */
+Datum
+binary_upgrade_sub_replication_origin_advance(PG_FUNCTION_ARGS)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Oid			subid;
+	Form_pg_subscription form;
+	char	   *subname;
+	XLogRecPtr	sublsn;
+	char		originname[NAMEDATALEN];
+	RepOriginId originid;
+
+	if (!IsBinaryUpgrade)
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("binary_upgrade_sub_replication_origin_advance can only be called when server is in binary upgrade mode"));
+
+	/* We must check these things before dereferencing the arguments */
+	if (PG_ARGISNULL(0) ||
+		PG_ARGISNULL(1))
+		elog(ERROR, "null argument to binary_upgrade_sub_replication_origin_advance is not allowed");
+
+	subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	sublsn = PG_GETARG_LSN(1);
+
+	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+
+	/* Fetch the existing tuple. */
+	tup = SearchSysCacheCopy2(SUBSCRIPTIONNAME, MyDatabaseId,
+							  CStringGetDatum(subname));
+	if (!HeapTupleIsValid(tup))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("subscription \"%s\" does not exist", subname));
+
+	form = (Form_pg_subscription) GETSTRUCT(tup);
+	subid = form->oid;
+
+	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
+	originid = replorigin_by_name(originname, false);
+	replorigin_advance(originid, sublsn, InvalidXLogRecPtr,
+					   false /* backward */ ,
+					   false /* WAL log */ );
+	heap_freetuple(tup);
+	table_close(rel, RowExclusiveLock);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Update the state of a subscription table.
  */
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 8b0c1e7b53..764a39fcb9 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_operator_d.h"
 #include "catalog/pg_proc_d.h"
 #include "catalog/pg_publication_d.h"
+#include "catalog/pg_subscription_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/hashfn.h"
 #include "fe_utils/string_utils.h"
@@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
+	pg_log_info("reading subscription membership of tables");
+	getSubscriptionTables(fout);
+
 	free(inhinfo);				/* not needed any longer */
 
 	*numTablesPtr = numTables;
@@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
 	return (PublicationInfo *) dobj;
 }
 
+/*
+ * findSubscriptionByOid
+ *	  finds the DumpableObject for the subscription with the given oid
+ *	  returns NULL if not found
+ */
+SubscriptionInfo *
+findSubscriptionByOid(Oid oid)
+{
+	CatalogId	catId;
+	DumpableObject *dobj;
+
+	catId.tableoid = SubscriptionRelationId;
+	catId.oid = oid;
+	dobj = findObjectByCatalogId(catId);
+	Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
+	return (SubscriptionInfo *) dobj;
+}
+
 
 /*
  * recordExtensionMembership
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index f7b6176692..2abeb573e7 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -296,6 +296,7 @@ static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo);
 static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo);
 static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo);
 static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo);
+static void dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo);
 static void dumpDatabase(Archive *fout);
 static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf,
 							   const char *dbname, Oid dboid);
@@ -4576,6 +4577,92 @@ is_superuser(Archive *fout)
 	return false;
 }
 
+/*
+ * getSubscriptionTables
+ *	  get information about subscription membership for dumpable tables.
+ */
+void
+getSubscriptionTables(Archive *fout)
+{
+	SubscriptionInfo *subinfo;
+	SubRelInfo *subrinfo;
+	PQExpBuffer query;
+	PGresult   *res;
+	int			i_srsubid;
+	int			i_srrelid;
+	int			i_srsubstate;
+	int			i_srsublsn;
+	int			i;
+	int			cur_rel = 0;
+	int			ntups;
+	Oid			last_srsubid = InvalidOid;
+
+	if (!fout->dopt->binary_upgrade || fout->remoteVersion < 100000)
+		return;
+
+	query = createPQExpBuffer();
+	appendPQExpBuffer(query, "SELECT srsubid, srrelid, srsubstate, srsublsn"
+					  " FROM pg_catalog.pg_subscription_rel"
+					  " ORDER BY srsubid");
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+	if (ntups == 0)
+		goto cleanup;
+
+	/* Get subscription relation fields */
+	i_srsubid = PQfnumber(res, "srsubid");
+	i_srrelid = PQfnumber(res, "srrelid");
+	i_srsubstate = PQfnumber(res, "srsubstate");
+	i_srsublsn = PQfnumber(res, "srsublsn");
+
+	subrinfo = pg_malloc(ntups * sizeof(SubRelInfo));
+	for (i = 0; i < ntups; i++)
+	{
+		Oid			cur_srsubid = atooid(PQgetvalue(res, i, i_srsubid));
+		Oid			relid = atooid(PQgetvalue(res, i, i_srrelid));
+		TableInfo  *tblinfo;
+
+		/*
+		 * If we switched to a new subscription, check if the subscription
+		 * exists.
+		 */
+		if (cur_srsubid != last_srsubid)
+		{
+			subinfo = findSubscriptionByOid(cur_srsubid);
+			if (subinfo == NULL)
+				pg_fatal("subscription with OID %u does not exist", cur_srsubid);
+
+			last_srsubid = cur_srsubid;
+		}
+
+		tblinfo = findTableByOid(relid);
+		if (tblinfo == NULL)
+			pg_fatal("failed sanity check, table with OID %u not found",
+					 relid);
+
+		/* OK, make a DumpableObject for this relationship */
+		subrinfo[cur_rel].dobj.objType = DO_SUBSCRIPTION_REL;
+		subrinfo[cur_rel].dobj.catId.tableoid = relid;
+		subrinfo[cur_rel].dobj.catId.oid = cur_srsubid;
+		AssignDumpId(&subrinfo[cur_rel].dobj);
+		subrinfo[cur_rel].dobj.name = pg_strdup(subinfo->dobj.name);
+		subrinfo[cur_rel].tblinfo = tblinfo;
+		subrinfo[cur_rel].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+		subrinfo[cur_rel].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+		subrinfo[cur_rel].subinfo = subinfo;
+
+		/* Decide whether we want to dump it */
+		selectDumpableObject(&(subrinfo[cur_rel].dobj), fout);
+
+		cur_rel++;
+	}
+
+cleanup:
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * getSubscriptions
  *	  get information about subscriptions
@@ -4601,6 +4688,7 @@ getSubscriptions(Archive *fout)
 	int			i_subpublications;
 	int			i_subbinary;
 	int			i_subpasswordrequired;
+	int			i_suboriginremotelsn;
 	int			i,
 				ntups;
 
@@ -4655,15 +4743,19 @@ getSubscriptions(Archive *fout)
 	if (fout->remoteVersion >= 160000)
 		appendPQExpBufferStr(query,
 							 " s.suborigin,\n"
-							 " s.subpasswordrequired\n");
+							 " s.subpasswordrequired,\n");
 	else
 		appendPQExpBuffer(query,
 						  " '%s' AS suborigin,\n"
-						  " 't' AS subpasswordrequired\n",
+						  " 't' AS subpasswordrequired,\n",
 						  LOGICALREP_ORIGIN_ANY);
 
+	appendPQExpBufferStr(query, "o.remote_lsn\n");
+
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n"
+						 "LEFT JOIN pg_replication_origin_status o \n"
+						 "    ON o.external_id = 'pg_' || s.oid::text \n"
 						 "WHERE s.subdbid = (SELECT oid FROM pg_database\n"
 						 "                   WHERE datname = current_database())");
 
@@ -4689,6 +4781,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
+	i_suboriginremotelsn = PQfnumber(res, "remote_lsn");
 
 	subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4721,6 +4814,11 @@ getSubscriptions(Archive *fout)
 		subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin));
 		subinfo[i].subpasswordrequired =
 			pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+		if (PQgetisnull(res, i, i_suboriginremotelsn))
+			subinfo[i].suboriginremotelsn = NULL;
+		else
+			subinfo[i].suboriginremotelsn =
+				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4730,6 +4828,71 @@ getSubscriptions(Archive *fout)
 	destroyPQExpBuffer(query);
 }
 
+/*
+ * dumpSubscriptionTable
+ *	  dump the definition of the given subscription table mapping
+ */
+static void
+dumpSubscriptionTable(Archive *fout, const SubRelInfo *subrinfo)
+{
+	DumpOptions *dopt = fout->dopt;
+	SubscriptionInfo *subinfo = subrinfo->subinfo;
+	PQExpBuffer query;
+	char	   *tag;
+
+	/* Do nothing in data-only dump */
+	if (dopt->dataOnly)
+		return;
+
+	tag = psprintf("%s %s", subinfo->dobj.name, subrinfo->dobj.name);
+
+	query = createPQExpBuffer();
+
+	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		/*
+		 * binary_upgrade_create_sub_rel_state will add the subscription
+		 * relation to pg_subscripion_rel table, this is supported only for
+		 * upgrade operation.
+		 */
+		if (fout->dopt->binary_upgrade && fout->remoteVersion >= 100000)
+		{
+			appendPQExpBuffer(query,
+							  "SELECT binary_upgrade_create_sub_rel_state('%s', %u, '%c'",
+							  subrinfo->dobj.name,
+							  subrinfo->tblinfo->dobj.catId.oid,
+							  subrinfo->srsubstate);
+
+			if (subrinfo->srsublsn[0] != '\0')
+				appendPQExpBuffer(query, ", '%s'",
+								  subrinfo->srsublsn);
+
+			appendPQExpBufferStr(query, ");\n");
+		}
+	}
+
+	/*
+	 * There is no point in creating a drop query as the drop is done by table
+	 * drop.  (If you think to change this, see also _printTocEntry().)
+	 * Although this object doesn't really have ownership as such, set the
+	 * owner field anyway to ensure that the command is run by the correct
+	 * role at restore time.
+	 */
+	if (subrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+		ArchiveEntry(fout, subrinfo->dobj.catId, subrinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = tag,
+								  .namespace = subrinfo->tblinfo->dobj.namespace->dobj.name,
+								  .owner = subinfo->rolname,
+								  .description = "SUBSCRIPTION TABLE",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+	/* These objects can't currently have comments or seclabels */
+
+	free(tag);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * dumpSubscription
  *	  dump the definition of the given subscription
@@ -4807,6 +4970,12 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 
 	appendPQExpBufferStr(query, ");\n");
 
+	if (dopt->binary_upgrade && subinfo->suboriginremotelsn)
+		appendPQExpBuffer(query,
+						  "SELECT binary_upgrade_sub_replication_origin_advance('%s', '%s');\n",
+						  subinfo->dobj.name,
+						  subinfo->suboriginremotelsn);
+
 	if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
 		ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
 					 ARCHIVE_OPTS(.tag = subinfo->dobj.name,
@@ -10425,6 +10594,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_SUBSCRIPTION_REL:
+			dumpSubscriptionTable(fout, (const SubRelInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18491,6 +18663,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_SUBSCRIPTION_REL:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9036b13f6a..dd7ae15505 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_SUBSCRIPTION_REL
 } DumpableObjectType;
 
 /*
@@ -670,8 +671,21 @@ typedef struct _SubscriptionInfo
 	char	   *subsynccommit;
 	char	   *subpublications;
 	char	   *subpasswordrequired;
+	char	   *suboriginremotelsn;
 } SubscriptionInfo;
 
+/*
+ * The SubRelInfo struct is used to represent a subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+	DumpableObject dobj;
+	SubscriptionInfo *subinfo;
+	TableInfo  *tblinfo;
+	char		srsubstate;
+	char	   *srsublsn;
+} SubRelInfo;
+
 /*
  *	common utility functions
  */
@@ -696,6 +710,7 @@ extern CollInfo *findCollationByOid(Oid oid);
 extern NamespaceInfo *findNamespaceByOid(Oid oid);
 extern ExtensionInfo *findExtensionByOid(Oid oid);
 extern PublicationInfo *findPublicationByOid(Oid oid);
+extern SubscriptionInfo *findSubscriptionByOid(Oid oid);
 
 extern void recordExtensionMembership(CatalogId catId, ExtensionInfo *ext);
 extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
@@ -755,5 +770,6 @@ extern void getPublicationNamespaces(Archive *fout);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
 extern void getSubscriptions(Archive *fout);
+extern void getSubscriptionTables(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 523a19c155..5bf1e47ee6 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -93,6 +93,7 @@ enum dbObjectTypePriorities
 	PRIO_PUBLICATION_REL,
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,
 	PRIO_SUBSCRIPTION,
+	PRIO_SUBSCRIPTION_REL,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
 	PRIO_REFRESH_MATVIEW		/* must be last! */
@@ -146,10 +147,11 @@ static const int dbObjectTypePriority[] =
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */
-	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION_REL		/* DO_SUBSCRIPTION_REL */
 };
 
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION_REL + 1),
 				 "array length mismatch");
 
 static DumpId preDataBoundId;
@@ -1542,6 +1544,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_SUBSCRIPTION_REL:
+			snprintf(buf, bufsize,
+					 "SUBSCRIPTION TABLE (ID %d)",
+					 obj->dumpId);
+			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
 					 "PRE-DATA BOUNDARY  (ID %d)",
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 56e313f562..5bf4ba8aa1 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
+static void check_for_subscription_state(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
@@ -104,6 +105,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
+	check_for_subscription_state(&old_cluster);
+
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
 	 * on-disk format for existing data.
@@ -785,6 +788,83 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster)
 		check_ok();
 }
 
+/*
+ * check_for_subscription_state()
+ *
+ * Verify that all subscriptions have a valid remote_lsn and don't contain
+ * any table in srsubstate different than ready ('r').
+ */
+static void
+check_for_subscription_state(ClusterInfo *cluster)
+{
+	int			dbnum;
+	bool		is_error = false;
+
+	/* PG 10 introduced subscriptions. */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1000)
+		return;
+
+	prep_status("Checking for subscription state");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		PGresult   *res;
+		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
+		PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+		/* We need to check for pg_replication_origin_status only once. */
+		if (dbnum == 0)
+		{
+			int			ntup;
+
+			res = executeQueryOrDie(conn,
+									"SELECT s.subname, d.datname "
+									"FROM pg_catalog.pg_subscription s "
+									"LEFT JOIN pg_catalog.pg_replication_origin_status os"
+									"  ON os.external_id = 'pg_' || s.oid "
+									"LEFT JOIN pg_catalog.pg_database d"
+									"  ON d.oid = s.subdbid "
+									"WHERE coalesce(remote_lsn, '0/0') = '0/0'");
+
+			ntup = PQntuples(res);
+			for (int i = 0; i < ntup; i++)
+			{
+				is_error = true;
+				pg_log(PG_WARNING,
+					   "\nWARNING: database \"%s\" has %s subscription with an invalid remote_lsn",
+					   PQgetvalue(res, i, 1),
+					   PQgetvalue(res, i, 0));
+			}
+			PQclear(res);
+		}
+
+		res = executeQueryOrDie(conn,
+								"SELECT count(0) "
+								"FROM pg_catalog.pg_subscription_rel "
+								"WHERE srsubstate != 'r'");
+
+		if (PQntuples(res) != 1)
+			pg_fatal("could not determine the number of non-ready subscription relations");
+
+		if (strcmp(PQgetvalue(res, 0, 0), "0") != 0)
+		{
+			is_error = true;
+			pg_log(PG_WARNING,
+				   "\nWARNING: database \"%s\" has %s subscription relations(s) in non-ready state",
+				   active_db->db_name,
+				   PQgetvalue(res, 0, 0));
+		}
+
+		PQclear(res);
+		PQfinish(conn);
+	}
+
+	if (is_error)
+		pg_fatal("subscription(s) have an invalid remote_lsn or subscription relation(s) are not in ready state");
+
+	check_ok();
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..9ea25dec70 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_subscription.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/t/003_subscription.pl b/src/bin/pg_upgrade/t/003_subscription.pl
new file mode 100644
index 0000000000..350c7971f0
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_subscription.pl
@@ -0,0 +1,230 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test for pg_upgrade of logical subscription
+use strict;
+use warnings;
+
+use Cwd qw(abs_path);
+use File::Basename qw(dirname);
+use File::Compare;
+use File::Find qw(find);
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::AdjustUpgrade;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+# Initialize the old subscriber node
+my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
+$old_sub->init;
+$old_sub->start;
+
+# Initialize the new subscriber
+my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
+$new_sub->init;
+my $bindir = $new_sub->config_data('--bindir');
+
+sub insert_line
+{
+	my $payload = shift;
+
+	foreach ("t1", "t2")
+	{
+		$publisher->safe_psql('postgres',
+			"INSERT INTO " . $_ . " (val) VALUES('$payload')");
+	}
+}
+
+# Initial setup
+foreach ("t1", "t2")
+{
+	$publisher->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+	$old_sub->safe_psql('postgres',
+		"CREATE TABLE " . $_ . " (id serial, val text)");
+}
+insert_line('before initial sync');
+
+# Setup logical replication, replicating only 1 table
+my $connstr = $publisher->connstr . ' dbname=postgres';
+
+$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR TABLE t1");
+
+$old_sub->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$connstr' PUBLICATION pub");
+
+# Wait for the catchup, as we need the subscription rel in ready state
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+# ------------------------------------------------------
+# Check that pg_upgrade refuses to run if there's a subscription without a
+# valid remote_lsn.
+# ------------------------------------------------------
+
+# Replication origin's remote_lsn isn't set if no data is replicated after the
+# initial sync.
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,        '--check',
+	],
+	'run of pg_upgrade --check for old instance with invalid remote_lsn');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# Make sure the replication origin is set
+insert_line('after initial sync');
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+my $result = $old_sub->safe_psql('postgres',
+	"SELECT COUNT(*) FROM pg_subscription_rel WHERE srsubstate != 'r'");
+is($result, qq(0),
+	"All tables in pg_subscription_rel should be in ready state");
+
+# Ensure that relation has reached 'ready' state
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
+$old_sub->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check the number of rows for each table on each server
+$result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(2), "check initial t1 table data on publisher");
+$result = $publisher->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(2), "check initial t1 table data on publisher");
+$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(2), "check initial t1 table data on the old subscriber");
+$result = $old_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0), "check initial t2 table data on the old subscriber");
+
+# ------------------------------------------------------
+# Check that pg_upgrade refuses to run if there's a subscription with tables in
+# a state different than 'r' (ready).
+# ------------------------------------------------------
+
+$old_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+
+# Set tables to 'i' state
+$old_sub->safe_psql(
+	'postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'i' WHERE srsubstate = 'r'");
+
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,        '--check',
+	],
+	'run of pg_upgrade --check for old instance with incorrect sub rel');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# ------------------------------------------------------
+# Check that pg_upgrade doesn't detect any problem once all the subscription's
+# relation are in 'r' (ready) state.
+# ------------------------------------------------------
+
+$old_sub->safe_psql(
+	'postgres',
+	"UPDATE pg_subscription_rel
+		SET srsubstate = 'r' WHERE srsubstate = 'i'");
+
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,        '--check',
+	],
+	'run of pg_upgrade --check for old instance with correct sub rel');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# ------------------------------------------------------
+# Check that after upgradation of the subscriber server, the incremental
+# changes added to the publisher are replicated.
+# ------------------------------------------------------
+
+# Stop the old subscriber, insert a row in each table while it's down and add
+# t2 to the publication
+my $remote_lsn = $old_sub->safe_psql('postgres',
+	"SELECT remote_lsn FROM pg_replication_origin_status");
+$old_sub->stop;
+
+insert_line('while old_sub is down');
+
+# Run pg_upgrade
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+		'-D',         $new_sub->data_dir, '-b', $bindir,
+		'-B',         $bindir,            '-s', $new_sub->host,
+		'-p',         $old_sub->port,     '-P', $new_sub->port,
+		$mode,
+	],
+	'run of pg_upgrade for new sub');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+$publisher->safe_psql('postgres', "ALTER PUBLICATION pub ADD TABLE t2");
+
+$new_sub->start;
+
+# Subscription relations and replication origin remote_lsn should be preserved
+$result =
+  $new_sub->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel");
+is($result, qq(1), "There should be 1 row in pg_subscription_rel");
+
+$result = $new_sub->safe_psql('postgres',
+	"SELECT remote_lsn FROM pg_replication_origin_status");
+is($result, qq($remote_lsn), "remote_lsn should have been preserved");
+
+# There should be no new replicated rows before enabling the subscription
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(2),
+	"t1 table has no new replicated rows before enabling the subscription");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0),
+	"no change in t2 table which is not part of the publication");
+
+# Enable the subscription
+$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+$publisher->wait_for_catchup('sub');
+
+# Rows on t1 should have been replicated, while nothing should happen for t2
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(3), "check replicated inserts on new subscriber");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(0),
+	"no change in table t2 afer enable subscription which is not part of the publication"
+);
+
+# Refresh the subscription, only the missing row on t2 should be replicated
+$new_sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub REFRESH PUBLICATION");
+$new_sub->wait_for_subscription_sync($publisher, 'sub');
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t1");
+is($result, qq(3),
+	"check there is no change when there was no changes replicated");
+$result = $new_sub->safe_psql('postgres', "SELECT count(*) FROM t2");
+is($result, qq(3),
+	"check replicated inserts on new subscriber after refreshing");
+
+done_testing();
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9805bc6118..1c1eeaa667 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5488,6 +5488,20 @@
   proargmodes => '{i,o,o,o,o,o,o,o,o,o}',
   proargnames => '{subid,subid,relid,pid,leader_pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}',
   prosrc => 'pg_stat_get_subscription' },
+{ oid => '6108', descr => 'add a relation with the specified relation state to pg_subscription_rel table',
+  proname => 'binary_upgrade_create_sub_rel_state', prorettype => 'void',
+  proargtypes => 'text oid char pg_lsn',
+  proallargtypes => '{text,oid,char,pg_lsn}',
+  proargmodes => '{i,i,i,i}',
+  proargnames => '{subname,relid,relstate,sublsn}',
+  prosrc => 'binary_upgrade_create_sub_rel_state' },
+{ oid => '6109', descr => 'update the remote_lsn for the subscriber\'s replication origin',
+  proname => 'binary_upgrade_sub_replication_origin_advance', prorettype => 'void',
+  proargtypes => 'text pg_lsn',
+  proallargtypes => '{text,pg_lsn}',
+  proargmodes => '{i,i}',
+  proargnames => '{subname,sublsn}',
+  prosrc => 'binary_upgrade_sub_replication_origin_advance' },
 { oid => '2026', descr => 'statistics: current backend PID',
   proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r',
   prorettype => 'int4', proargtypes => '', prosrc => 'pg_backend_pid' },
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f2af84d7ca..ff03f2a830 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2650,6 +2650,7 @@ SubLinkType
 SubOpts
 SubPlan
 SubPlanState
+SubRelInfo
 SubRemoveRels
 SubTransactionId
 SubXactCallback
-- 
2.34.1

Reply via email to