Hi,

On Thu, Apr 06, 2023 at 04:49:59AM +0000, Hayato Kuroda (Fujitsu) wrote:
> Dear Julien,
>
> > I'm attaching a v3 to fix a recent conflict with pg_dump due to 
> > a563c24c9574b7
> > (Allow pg_dump to include/exclude child tables automatically).
>
> Thank you for making the patch.
> FYI - it could not be applied due to recent commits. SUBOPT_* and attributes
> in SubscriptionInfo was added these days.

Thanks a lot for warning me!

While rebasing and testing the patch, I realized that I forgot to git-add a
chunk, so I want ahead and added some minimal TAP tests to make sure that the
feature and various checks work as expected, also demonstrating that you can
safely resume after running pg_upgrade a logical replication setup where only
some of the tables are added to a publication, where new rows and new tables
are added to the publication while pg_upgrade is running (for the new table you
obviously need to make sure that the same relation exist on the subscriber side
but that's orthogonal to this patch).

While doing so, I also realized that the subscription's underlying replication
origin remote LSN is only set after some activity is seen *after* the initial
sync, so I also added a new check in pg_upgrade to make sure that all remote
origin tied to a subscription have a valid remote_lsn when the new option is
used.  Documentation is updated to cover that, same for the TAP tests.

v4 attached.
>From a5823a0ea289860367e0ebfb76c7dad7be5337e7 Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Wed, 22 Feb 2023 09:19:32 +0800
Subject: [PATCH v4] Optionally preserve the full subscription's state during
 pg_upgrade

Previously, only the subscription metadata information was preserved.  Without
the list of relations and their state it's impossible to re-enable the
subscriptions without missing some records as the list of relations can only be
refreshed after enabling the subscription (and therefore starting the apply
worker).  Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relations are new on the publication
side, and therefore should be fully synced, and which shouldn't.

Similarly, the subscription's replication origin are needed to ensure
that we don't replicate anything twice.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
additional commands to be able to restore the content of pg_subscription_rel,
and addition LSN parameter in the subscription creation to restore the
underlying replication origin remote LSN.  The LSN parameter is only accepted
in CREATE SUBSCRIPTION in binary upgrade mode.

The new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
the following syntax:

ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])

The relation is identified by its oid, as it's preserved during pg_upgrade.
The lsn is optional, and defaults to NULL / InvalidXLogRecPtr if not provided.
Explicitly passing InvalidXLogRecPtr (0/0) is however not allowed.

This mode is optional and not enabled by default.  A new
--preserve-subscription-state option is added to pg_upgrade to use it.  For
now, pg_upgrade will check that all the subscription have a valid replication
origin remote_lsn, and that all underlying relations are in 'r' (ready) state,
and will error out if that's not the case, logging the reason for the failure.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 doc/src/sgml/ref/pgupgrade.sgml          |  19 +++
 src/backend/commands/subscriptioncmds.c  |  67 +++++++-
 src/backend/parser/gram.y                |  11 ++
 src/bin/pg_dump/pg_backup.h              |   2 +
 src/bin/pg_dump/pg_dump.c                | 114 ++++++++++++-
 src/bin/pg_dump/pg_dump.h                |  13 ++
 src/bin/pg_upgrade/check.c               |  82 +++++++++
 src/bin/pg_upgrade/dump.c                |   3 +-
 src/bin/pg_upgrade/meson.build           |   1 +
 src/bin/pg_upgrade/option.c              |   7 +
 src/bin/pg_upgrade/pg_upgrade.h          |   1 +
 src/bin/pg_upgrade/t/003_subscription.pl | 204 +++++++++++++++++++++++
 src/include/nodes/parsenodes.h           |   3 +-
 13 files changed, 522 insertions(+), 5 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_subscription.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..b23c536954 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,25 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--preserve-subscription-state</option></term>
+      <listitem>
+       <para>
+        Fully preserve the logical subscription state if any.  That includes
+        the underlying replication origin with their remote LSN and the list of
+        relations in each subscription so that replication can be simply
+        resumed if the subscriptions are reactived.
+        If that option isn't used, it is up to the user to reactivate the
+        subscriptions in a suitable way; see the subscription part in <xref
+        linkend="pg-dump-notes"/> for more information.
+        If this option is used and any of the subscription on the old cluster
+        has an unknown <varname>remote_lsn</varname> (0/0), or has any relation
+        in a state different from <literal>r</literal> (ready), the
+        <application>pg_upgrade</application> run will error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 3251d89ba8..4fa688d16f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -71,6 +71,8 @@
 #define SUBOPT_RUN_AS_OWNER                    0x00001000
 #define SUBOPT_LSN                                     0x00002000
 #define SUBOPT_ORIGIN                          0x00004000
+#define SUBOPT_RELID                           0x00008000
+#define SUBOPT_STATE                           0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -97,6 +99,8 @@ typedef struct SubOpts
        bool            runasowner;
        char       *origin;
        XLogRecPtr      lsn;
+       Oid                     relid;
+       char            state;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -353,6 +357,38 @@ parse_subscription_options(ParseState *pstate, List 
*stmt_options,
                        opts->specified_opts |= SUBOPT_LSN;
                        opts->lsn = lsn;
                }
+               else if (IsSet(supported_opts, SUBOPT_RELID) &&
+                                strcmp(defel->defname, "relid") == 0)
+               {
+                       Oid                     relid = defGetObjectId(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_RELID))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (!OidIsValid(relid))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation 
identifier used")));
+
+                       opts->specified_opts |= SUBOPT_RELID;
+                       opts->relid = relid;
+               }
+               else if (IsSet(supported_opts, SUBOPT_STATE) &&
+                                strcmp(defel->defname, "state") == 0)
+               {
+                       char       *state_str = defGetString(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_STATE))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (strlen(state_str) != 1)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation state 
used")));
+
+                       opts->specified_opts |= SUBOPT_STATE;
+                       opts->state = defGetString(defel)[0];
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -580,6 +616,7 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        bits32          supported_opts;
        SubOpts         opts = {0};
        AclResult       aclresult;
+       RepOriginId     originid;
 
        /*
         * Parse and check options.
@@ -592,6 +629,8 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                          SUBOPT_STREAMING | 
SUBOPT_TWOPHASE_COMMIT |
                                          SUBOPT_DISABLE_ON_ERR | 
SUBOPT_PASSWORD_REQUIRED |
                                          SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+       if(IsBinaryUpgrade)
+               supported_opts |= SUBOPT_LSN;
        parse_subscription_options(pstate, stmt->options, supported_opts, 
&opts);
 
        /*
@@ -718,7 +757,12 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
        recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
        ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, 
sizeof(originname));
-       replorigin_create(originname);
+       originid = replorigin_create(originname);
+
+       if (IsBinaryUpgrade && IsSet(opts.lsn, SUBOPT_LSN))
+               replorigin_advance(originid, opts.lsn, InvalidXLogRecPtr,
+                                                       false /* backward */ ,
+                                                       false /* WAL log */ );
 
        /*
         * Connect to remote side to execute requested commands and fetch table
@@ -1428,6 +1472,27 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                break;
                        }
 
+               case ALTER_SUBSCRIPTION_ADD_TABLE:
+                       {
+                               if (!IsBinaryUpgrade)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_SYNTAX_ERROR)),
+                                                       errmsg("ALTER 
SUBSCRIPTION ... ADD TABLE is not supported"));
+
+                               supported_opts = SUBOPT_RELID | SUBOPT_STATE | 
SUBOPT_LSN;
+                               parse_subscription_options(pstate, 
stmt->options,
+                                                                               
   supported_opts, &opts);
+
+                               /* relid and state should always be provided. */
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_RELID));
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_STATE));
+
+                               AddSubscriptionRelState(subid, opts.relid, 
opts.state,
+                                                                               
opts.lsn);
+
+                               break;
+                       }
+
                default:
                        elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
                                 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index acf6cf4866..0432bf2cb4 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10695,6 +10695,17 @@ AlterSubscriptionStmt:
                                        n->options = $5;
                                        $$ = (Node *) n;
                                }
+                       /* for binary upgrade only */
+                       | ALTER SUBSCRIPTION name ADD_P TABLE definition
+                               {
+                                       AlterSubscriptionStmt *n =
+                                               makeNode(AlterSubscriptionStmt);
+
+                                       n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+                                       n->subname = $3;
+                                       n->options = $6;
+                                       $$ = (Node *) n;
+                               }
                ;
 
 /*****************************************************************************
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..8a72a39d60 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -200,6 +200,8 @@ typedef struct _dumpOptions
 
        int                     sequence_data;  /* dump sequence data even in 
schema-only mode */
        int                     do_nothing;
+
+       bool            preserve_subscriptions;
 } DumpOptions;
 
 /*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7a504dfe25..b0d18689a6 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -431,6 +431,7 @@ main(int argc, char **argv)
                {"table-and-children", required_argument, NULL, 12},
                {"exclude-table-and-children", required_argument, NULL, 13},
                {"exclude-table-data-and-children", required_argument, NULL, 
14},
+               {"preserve-subscription-state", no_argument, NULL, 15},
 
                {NULL, 0, NULL, 0}
        };
@@ -657,6 +658,10 @@ main(int argc, char **argv)
                                                                                
  optarg);
                                break;
 
+                       case 15:                        /* include full 
subscription state */
+                               dopt.preserve_subscriptions = true;
+                               break;
+
                        default:
                                /* getopt_long already emitted a complaint */
                                pg_log_error_hint("Try \"%s --help\" for more 
information.", progname);
@@ -714,6 +719,10 @@ main(int argc, char **argv)
        if (dopt.do_nothing && dopt.dump_inserts == 0)
                pg_fatal("option --on-conflict-do-nothing requires option 
--inserts, --rows-per-insert, or --column-inserts");
 
+       /* --preserve-subscription-state requires --binary-upgrade */
+       if (dopt.preserve_subscriptions && !dopt.binary_upgrade)
+               pg_fatal("option --preserve-subscription-state requires option 
--binary-upgrade");
+
        /* Identify archive format to emit */
        archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -4585,6 +4594,69 @@ is_superuser(Archive *fout)
        return false;
 }
 
+/*
+ * getSubscriptionRels
+ *       get information about the given subscription's relations
+ */
+static SubRelInfo *
+getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
+{
+       SubRelInfo *rels;
+       PQExpBuffer query;
+       PGresult   *res;
+       int                     i_srrelid;
+       int                     i_srsubstate;
+       int                     i_srsublsn;
+       int                     i,
+                               ntups;
+
+       if (!fout->dopt->binary_upgrade || !fout->dopt->preserve_subscriptions)
+       {
+               *nrels = 0;
+
+               return NULL;
+       }
+
+       query = createPQExpBuffer();
+
+       appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
+                                                               " FROM 
pg_subscription_rel"
+                                                               " WHERE srsubid 
= %u", subid);
+
+       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+       ntups = PQntuples(res);
+       *nrels = ntups;
+
+       if (ntups == 0)
+       {
+               rels = NULL;
+               goto cleanup;
+       }
+
+       /*
+        * Get subscription relation fields.
+        */
+       i_srrelid = PQfnumber(res, "srrelid");
+       i_srsubstate = PQfnumber(res, "srsubstate");
+       i_srsublsn = PQfnumber(res, "srsublsn");
+
+       rels = pg_malloc(ntups * sizeof(SubRelInfo));
+
+       for (i = 0; i < ntups; i++)
+       {
+               rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+               rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+               rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+       }
+
+cleanup:
+       PQclear(res);
+       destroyPQExpBuffer(query);
+
+       return rels;
+}
+
 /*
  * getSubscriptions
  *       get information about subscriptions
@@ -4610,6 +4682,7 @@ getSubscriptions(Archive *fout)
        int                     i_subpublications;
        int                     i_subbinary;
        int                     i_subpasswordrequired;
+       int                     i_suboriginremotelsn;
        int                     i,
                                ntups;
 
@@ -4664,15 +4737,19 @@ getSubscriptions(Archive *fout)
        if (fout->remoteVersion >= 160000)
                appendPQExpBufferStr(query,
                                                         " s.suborigin,\n"
-                                                        " 
s.subpasswordrequired\n");
+                                                        " 
s.subpasswordrequired,\n");
        else
                appendPQExpBuffer(query,
                                                  " '%s' AS suborigin,\n"
-                                                 " 't' AS 
subpasswordrequired\n",
+                                                 " 't' AS 
subpasswordrequired,\n",
                                                  LOGICALREP_ORIGIN_ANY);
 
+       appendPQExpBufferStr(query, "o.remote_lsn\n");
+
        appendPQExpBufferStr(query,
                                                 "FROM pg_subscription s\n"
+                                                "LEFT JOIN 
pg_replication_origin_status o \n"
+                                                "    ON o.external_id = 'pg_' 
|| s.oid::text \n"
                                                 "WHERE s.subdbid = (SELECT oid 
FROM pg_database\n"
                                                 "                   WHERE 
datname = current_database())");
 
@@ -4698,6 +4775,7 @@ getSubscriptions(Archive *fout)
        i_subdisableonerr = PQfnumber(res, "subdisableonerr");
        i_suborigin = PQfnumber(res, "suborigin");
        i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
+       i_suboriginremotelsn = PQfnumber(res, "remote_lsn");
 
        subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4730,6 +4808,15 @@ getSubscriptions(Archive *fout)
                subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, 
i_suborigin));
                subinfo[i].subpasswordrequired =
                        pg_strdup(PQgetvalue(res, i, i_subpasswordrequired));
+               if (PQgetisnull(res, i, i_suboriginremotelsn))
+                       subinfo[i].suboriginremotelsn = NULL;
+               else
+                       subinfo[i].suboriginremotelsn =
+                               pg_strdup(PQgetvalue(res, i, 
i_suboriginremotelsn));
+
+               subinfo[i].subrels = getSubscriptionRels(fout,
+                                                                               
                 subinfo[i].dobj.catId.oid,
+                                                                               
                 &subinfo[i].nrels);
 
                /* Decide whether we want to dump it */
                selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -4814,9 +4901,31 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
        if (strcmp(subinfo->subpasswordrequired, "t") != 0)
                appendPQExpBuffer(query, ", password_required = false");
 
+       if (dopt->binary_upgrade && dopt->preserve_subscriptions &&
+               subinfo->suboriginremotelsn)
+       {
+               appendPQExpBuffer(query, ", lsn = '%s'", 
subinfo->suboriginremotelsn);
+       }
+
        appendPQExpBufferStr(query, ");\n");
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+       {
+               for (i = 0; i < subinfo->nrels; i++)
+               {
+                       appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD 
TABLE "
+                                                                        
"(relid = %u, state = '%c'",
+                                                                        
qsubname,
+                                                                        
subinfo->subrels[i].srrelid,
+                                                                        
subinfo->subrels[i].srsubstate);
+
+                       if (subinfo->subrels[i].srsublsn[0] != '\0')
+                               appendPQExpBuffer(query, ", LSN = '%s'",
+                                                                 
subinfo->subrels[i].srsublsn);
+
+                       appendPQExpBufferStr(query, ");");
+               }
+
                ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
                                         ARCHIVE_OPTS(.tag = subinfo->dobj.name,
                                                                  .owner = 
subinfo->rolname,
@@ -4824,6 +4933,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
                                                                  .section = 
SECTION_POST_DATA,
                                                                  .createStmt = 
query->data,
                                                                  .dropStmt = 
delq->data));
+       }
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
                dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed6ce41ad7..2f7e805cfc 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -647,6 +647,16 @@ typedef struct _PublicationSchemaInfo
        NamespaceInfo *pubschema;
 } PublicationSchemaInfo;
 
+/*
+ * The SubRelInfo struct is used to represent subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+       Oid             srrelid;
+       char    srsubstate;
+       char   *srsublsn;
+} SubRelInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -664,6 +674,9 @@ typedef struct _SubscriptionInfo
        char       *subsynccommit;
        char       *subpublications;
        char       *subpasswordrequired;
+       char       *suboriginremotelsn;
+       int                     nrels;
+       SubRelInfo *subrels;
 } SubscriptionInfo;
 
 /*
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index fea159689e..1634b26175 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -20,6 +20,7 @@ static void check_is_install_user(ClusterInfo *cluster);
 static void check_proper_datallowconn(ClusterInfo *cluster);
 static void check_for_prepared_transactions(ClusterInfo *cluster);
 static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
+static void check_for_subscription_state(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
@@ -103,6 +104,8 @@ check_and_dump_old_cluster(bool live_check)
        check_for_composite_data_type_usage(&old_cluster);
        check_for_reg_data_type_usage(&old_cluster);
        check_for_isn_and_int8_passing_mismatch(&old_cluster);
+       if (user_opts.preserve_subscriptions)
+               check_for_subscription_state(&old_cluster);
 
        /*
         * PG 16 increased the size of the 'aclitem' type, which breaks the 
on-disk
@@ -785,6 +788,85 @@ check_for_isn_and_int8_passing_mismatch(ClusterInfo 
*cluster)
                check_ok();
 }
 
+/*
+ * check_for_subscription_state()
+ *
+ * Verify that all subscriptions have a valid remote_lsn and doesn't contain
+ * any table in a state different than ready.
+ */
+static void
+check_for_subscription_state(ClusterInfo *cluster)
+{
+       int                     dbnum;
+       bool            is_error = false;
+
+       Assert(user_opts.preserve_subscriptions);
+
+       /* No subscription before pg10. */
+       if (GET_MAJOR_VERSION(cluster->major_version < 1000))
+               return;
+
+       prep_status("Checking for subscription state");
+
+       for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+       {
+               PGresult   *res;
+               int                     nb;
+               DbInfo     *active_db = &cluster->dbarr.dbs[dbnum];
+               PGconn     *conn = connectToServer(cluster, active_db->db_name);
+
+               /* We need to check for pg_replication_origin_status only once. 
*/
+               if (dbnum == 0)
+               {
+                       res = executeQueryOrDie(conn,
+                                                                       "SELECT 
count(0) "
+                                                                       "FROM 
pg_catalog.pg_subscription s "
+                                                                       "LEFT 
JOIN pg_catalog.pg_replication_origin_status os"
+                                                                       "  ON 
os.external_id = 'pg_' || s.oid "
+                                                                       "WHERE 
coalesce(remote_lsn, '0/0') = '0/0'");
+
+                       if (PQntuples(res) != 1)
+                               pg_fatal("could not determine the number of 
remote origin with invalid remote_lsn");
+
+                       nb = atooid(PQgetvalue(res, 0, 0));
+                       if (nb != 0)
+                       {
+                               is_error = true;
+                               pg_log(PG_WARNING,
+                                          "\nWARNING:  %d subscription have 
invalid remote_lsn",
+                                          nb);
+                       }
+                       PQclear(res);
+               }
+
+               res = executeQueryOrDie(conn,
+                                                               "SELECT 
count(0) "
+                                                               "FROM 
pg_catalog.pg_subscription_rel "
+                                                               "WHERE 
srsubstate != 'r'");
+
+               if (PQntuples(res) != 1)
+                       pg_fatal("could not determine the number of non-ready 
subscription relations");
+
+               nb = atooid(PQgetvalue(res, 0, 0));
+               if (nb != 0)
+               {
+                       is_error = true;
+                       pg_log(PG_WARNING,
+                                  "\nWARNING: database \"%s\" has %d 
subscription "
+                                  "relations(s) in non-ready state", 
active_db->db_name, nb);
+               }
+
+               PQclear(res);
+               PQfinish(conn);
+       }
+
+       if (is_error)
+               pg_fatal("--preserve-subscription-state is incompatible with "
+                               "subscription relations in non-ready state");
+
+       check_ok();
+}
+
 /*
  * Verify that no user defined postfix operators exist.
  */
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..9284576af7 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -53,9 +53,10 @@ generate_old_dump(void)
 
                parallel_exec_prog(log_file_name, NULL,
                                                   "\"%s/pg_dump\" %s 
--schema-only --quote-all-identifiers "
-                                                  "--binary-upgrade 
--format=custom %s --file=\"%s/%s\" %s",
+                                                  "--binary-upgrade 
--format=custom %s %s --file=\"%s/%s\" %s",
                                                   new_cluster.bindir, 
cluster_conn_opts(&old_cluster),
                                                   log_opts.verbose ? 
"--verbose" : "",
+                                                  
user_opts.preserve_subscriptions ? "--preserve-subscription-state" : "",
                                                   log_opts.dumpdir,
                                                   sql_file_name, 
escaped_connstr.data);
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..9ea25dec70 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_subscription.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 8869b6b60d..b033aa26ba 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
                {"verbose", no_argument, NULL, 'v'},
                {"clone", no_argument, NULL, 1},
                {"copy", no_argument, NULL, 2},
+               {"preserve-subscription-state", no_argument, NULL, 3},
 
                {NULL, 0, NULL, 0}
        };
@@ -66,6 +67,7 @@ parseCommandLine(int argc, char *argv[])
 
        user_opts.do_sync = true;
        user_opts.transfer_mode = TRANSFER_MODE_COPY;
+       user_opts.preserve_subscriptions = false;
 
        os_info.progname = get_progname(argv[0]);
 
@@ -199,6 +201,10 @@ parseCommandLine(int argc, char *argv[])
                                user_opts.transfer_mode = TRANSFER_MODE_COPY;
                                break;
 
+                       case 3:
+                               user_opts.preserve_subscriptions = true;
+                               break;
+
                        default:
                                fprintf(stderr, _("Try \"%s --help\" for more 
information.\n"),
                                                os_info.progname);
@@ -289,6 +295,7 @@ usage(void)
        printf(_("  -V, --version                 display version information, 
then exit\n"));
        printf(_("  --clone                       clone instead of copying 
files to new cluster\n"));
        printf(_("  --copy                        copy files to new cluster 
(default)\n"));
+       printf(_("  --preserve-subscription-state preserve the subscription 
state fully\n"));
        printf(_("  -?, --help                    show this help, then 
exit\n"));
        printf(_("\n"
                         "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..131fd9a56e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -304,6 +304,7 @@ typedef struct
        transferMode transfer_mode; /* copy files or link them? */
        int                     jobs;                   /* number of 
processes/threads to use */
        char       *socketdir;          /* directory to use for Unix sockets */
+       bool            preserve_subscriptions; /* fully transfer subscription 
state */
 } UserOpts;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_subscription.pl 
b/src/bin/pg_upgrade/t/003_subscription.pl
new file mode 100644
index 0000000000..9328b3557b
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_subscription.pl
@@ -0,0 +1,204 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test for pg_upgrade of logical subscription
+use strict;
+use warnings;
+
+use Cwd qw(abs_path);
+use File::Basename qw(dirname);
+use File::Compare;
+use File::Find qw(find);
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use PostgreSQL::Test::AdjustUpgrade;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $publisher = PostgreSQL::Test::Cluster->new('publisher');
+$publisher->init(allows_streaming => 'logical');
+$publisher->start;
+
+# Initialize the old subscriber node
+my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
+$old_sub->init;
+$old_sub->start;
+
+# Initialize the new subscriber
+my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
+$new_sub->init;
+my $bindir = $new_sub->config_data('--bindir');
+
+sub insert_line
+{
+       my $payload = shift;
+
+       foreach("t1", "t2")
+       {
+               $publisher->safe_psql('postgres',
+                       "INSERT INTO " . $_ . " (val) VALUES('$payload')");
+       }
+}
+
+# Initial setup
+foreach ("t1", "t2")
+{
+       $publisher->safe_psql('postgres',
+               "CREATE TABLE " . $_ . " (id serial, val text)");
+       $old_sub->safe_psql('postgres',
+               "CREATE TABLE " . $_ . " (id serial, val text)");
+}
+insert_line('before initial sync');
+
+# Setup logical replication, replicating only 1 table
+my $connstr = $publisher->connstr . ' dbname=postgres';
+
+$publisher->safe_psql('postgres',
+       "CREATE PUBLICATION pub FOR TABLE t1");
+
+$old_sub->safe_psql('postgres',
+       "CREATE SUBSCRIPTION sub CONNECTION '$connstr' PUBLICATION pub");
+
+# Wait for the catchup, as we need the subscription rel in ready state
+$old_sub->wait_for_subscription_sync($publisher, 'sub');
+
+# replication origin's remote_lsn isn't set if not data is replicated after the
+# initial sync
+command_fails(
+       [
+               'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+               '-D',         $new_sub->data_dir, '-b', $bindir,
+               '-B',         $bindir,            '-s', $new_sub->host,
+               '-p',         $old_sub->port,     '-P', $new_sub->port,
+               $mode,
+               '--preserve-subscription-state',
+               '--check',
+       ],
+       'run of pg_upgrade --check for old instance with invalid remote_lsn');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# Make sure the replication origin is set
+insert_line('after initial sync');
+$publisher->wait_for_catchup('sub');
+
+my $result = $old_sub->safe_psql('postgres',
+    "SELECT COUNT(*) FROM pg_subscription_rel WHERE srsubstate != 'r'");
+is ($result, qq(0), "All tables in pg_subscription_rel should be in ready 
state");
+
+# Check the number of rows for each table on each server
+$result = $publisher->safe_psql('postgres',
+       "SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should have 2 rows on the publisher");
+$result = $publisher->safe_psql('postgres',
+       "SELECT count(*) FROM t2");
+is ($result, qq(2), "Table t2 should have 2 rows on the publisher");
+$result = $old_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should have 2 rows on the old subscriber");
+$result = $old_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should have 0 rows on the old subscriber");
+
+# Check that pg_upgrade refuses to upgrade subscription with non ready tables
+$old_sub->safe_psql('postgres',
+    "ALTER SUBSCRIPTION sub DISABLE");
+$old_sub->safe_psql('postgres',
+       "UPDATE pg_subscription_rel
+               SET srsubstate = 'i' WHERE srsubstate = 'r'");
+
+command_fails(
+       [
+               'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+               '-D',         $new_sub->data_dir, '-b', $bindir,
+               '-B',         $bindir,            '-s', $new_sub->host,
+               '-p',         $old_sub->port,     '-P', $new_sub->port,
+               $mode,
+               '--preserve-subscription-state',
+               '--check',
+       ],
+       'run of pg_upgrade --check for old instance with incorrect sub rel');
+ok(-d $new_sub->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ not removed after pg_upgrade failure");
+rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
+
+# and otherwise works
+$old_sub->safe_psql('postgres',
+       "UPDATE pg_subscription_rel
+               SET srsubstate = 'r' WHERE srsubstate = 'i'");
+
+command_ok(
+       [
+               'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+               '-D',         $new_sub->data_dir, '-b', $bindir,
+               '-B',         $bindir,            '-s', $new_sub->host,
+               '-p',         $old_sub->port,     '-P', $new_sub->port,
+               $mode,
+               '--preserve-subscription-state',
+               '--check',
+       ],
+       'run of pg_upgrade --check for old instance with correct sub rel');
+
+# Stop the old subscriber, insert a row in each table while it's down and add
+# t2 to the publication
+$old_sub->stop;
+
+insert_line('while old_sub is down');
+
+# Run pg_upgrade
+command_ok(
+       [
+               'pg_upgrade', '--no-sync',        '-d', $old_sub->data_dir,
+               '-D',         $new_sub->data_dir, '-b', $bindir,
+               '-B',         $bindir,            '-s', $new_sub->host,
+               '-p',         $old_sub->port,     '-P', $new_sub->port,
+               $mode,
+               '--preserve-subscription-state',
+       ],
+       'run of pg_upgrade for new sub');
+ok( !-d $new_sub->data_dir . "/pg_upgrade_output.d",
+       "pg_upgrade_output.d/ removed after pg_upgrade success");
+$publisher->safe_psql('postgres',
+       "ALTER PUBLICATION pub ADD TABLE t2");
+
+$new_sub->start;
+
+# There should be no new replicated rows before enabling the subscription
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t1");
+is ($result, qq(2), "Table t1 should still have 2 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should still have 0 rows on the new subscriber");
+
+# Enable the subscription
+$new_sub->safe_psql('postgres',
+       "ALTER SUBSCRIPTION sub ENABLE");
+
+$publisher->wait_for_catchup('sub');
+
+# Rows on t1 should have been replicated
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t1");
+is ($result, qq(3), "Table t1 should now have 3 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t2");
+is ($result, qq(0), "Table t2 should still have 0 rows on the new subscriber");
+
+# Refresh the subscription, only the missing row on t2 show be replicated
+$new_sub->safe_psql('postgres',
+       "ALTER SUBSCRIPTION sub REFRESH PUBLICATION");
+$publisher->wait_for_catchup('sub');
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t1");
+is ($result, qq(3), "Table t1 should still have 3 rows on the new subscriber");
+$result = $new_sub->safe_psql('postgres',
+       "SELECT count(*) FROM t2");
+is ($result, qq(3), "Table t2 should now have 3 rows on the new subscriber");
+
+done_testing();
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index cc7b32b279..0ec85ceda2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4028,7 +4028,8 @@ typedef enum AlterSubscriptionType
        ALTER_SUBSCRIPTION_DROP_PUBLICATION,
        ALTER_SUBSCRIPTION_REFRESH,
        ALTER_SUBSCRIPTION_ENABLED,
-       ALTER_SUBSCRIPTION_SKIP
+       ALTER_SUBSCRIPTION_SKIP,
+       ALTER_SUBSCRIPTION_ADD_TABLE
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
-- 
2.37.0

Reply via email to