On Mon, Feb 20, 2023 at 03:07:37PM +0800, Julien Rouhaud wrote:
> On Mon, Feb 20, 2023 at 11:07:42AM +0530, Amit Kapila wrote:
> >
> > I think the current mechanism tries to provide more flexibility to the
> > users. OTOH, in some of the cases where users don't want to change
> > anything in the logical replication (both upstream and downstream
> > function as it is) after the upgrade then they need to do more work. I
> > think ideally there should be some option in pg_dump that allows us to
> > dump the contents of pg_subscription_rel as well, so that is easier
> > for users to continue replication after the upgrade. We can then use
> > it for binary-upgrade mode as well.
>
> Is there really a use case for dumping the content of pg_subscription_rel
> outside of pg_upgrade?  I'm not particularly worried about the publisher going
> away or changing while pg_upgrade is running , but for a normal pg_dump /
> pg_restore I don't really see how anyone would actually want to resume logical
> replication from a pg_dump, especially since it's almost guaranteed that the
> node will already have consumed data from the publication that won't be in the
> dump in the first place.
>
> Are you ok with the suggested syntax above (probably with extra parens to 
> avoid
> adding new keywords), or do you have some better suggestion?  I'm a bit 
> worried
> about adding some O(n) commands, as it can add some noticeable slow-down for
> pg_upgrade-ing logical replica, but I don't really see how to avoid that.  
> Note
> that if we make this option available to end-users, we will have to use the
> relation name rather than its oid, which will make this option even more
> expensive when restoring due to the extra lookups.
>
> For the pg_upgrade use-case, do you see any reason to not restore the
> pg_subscription_rel by default?  Maybe having an option to not restore it 
> would
> make sense if it indeed add noticeable overhead when publications have a lot 
> of
> tables?

Since I didn't hear any objection I worked on a POC patch with this approach.

For now when pg_dump is invoked with --binary, it will always emit extra
commands to restore the relation list.  This command is only allowed when the
server is started in binary upgrade mode.

The new command is of the form

ALTER SUBSCRIPTION name ADD TABLE (relid = X, state = 'Y', lsn = 'Z/Z')

with the lsn part being optional.  I'm not sure if there should be some new
regression test for that, as it would be a bit costly.  Note that pg_upgrade of
a logical replica isn't covered by any regression test that I could find.

I did test it manually though, and it fixes my original problem, allowing me to
safely resume logical replication by just re-enabling it.  I didn't do any
benchmarking to see how much overhead it adds.
>From 18ccb63d223e020fd3027e2ddcbc997eb968c1ba Mon Sep 17 00:00:00 2001
From: Julien Rouhaud <julien.rouh...@free.fr>
Date: Wed, 22 Feb 2023 09:19:32 +0800
Subject: [PATCH v1] POC: Preserve the subscription relations during pg_upgrade

Previously, only the subscription information was preserved.  Without the list
of relations and their state it's impossible to re-enable the subscriptions
without missing some records as the list of relations can only be refreshed
after enabling the subscription (and therefore starting the apply worker).
Even if we added a way to refresh the subscription while enabling a
publication, we still wouldn't know which relation are new on the publication
side, and therefore should be fully synced, and which shouldn't.

To fix this problem, this patch teaches pg_dump in binary upgrade mode to emit
additional commands to be able to restore the content of pg_subscription_rel.

This new ALTER SUBSCRIPTION subcommand, usable only during binary upgrade, has
the following syntax:

ALTER SUBSCRIPTION name ADD TABLE (relid = XYZ, state = 'x' [, lsn = 'X/Y'])

The relation is identified by its oid, as it's preserved during pg_upgrade.
The lsn is optional, and defaults to NULL / InvalidXLogRecPtr.

Author: Julien Rouhaud
Reviewed-by: FIXME
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
---
 src/backend/commands/subscriptioncmds.c | 57 +++++++++++++++++
 src/backend/parser/gram.y               | 11 ++++
 src/bin/pg_dump/pg_dump.c               | 84 +++++++++++++++++++++++++
 src/bin/pg_dump/pg_dump.h               | 12 ++++
 src/include/nodes/parsenodes.h          |  3 +-
 5 files changed, 166 insertions(+), 1 deletion(-)

diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 464db6d247..7f2560faf8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -66,6 +66,8 @@
 #define SUBOPT_DISABLE_ON_ERR          0x00000400
 #define SUBOPT_LSN                                     0x00000800
 #define SUBOPT_ORIGIN                          0x00001000
+#define SUBOPT_RELID                           0x00002000
+#define SUBOPT_STATE                           0x00004000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -90,6 +92,8 @@ typedef struct SubOpts
        bool            disableonerr;
        char       *origin;
        XLogRecPtr      lsn;
+       Oid                     relid;
+       char            state;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -324,6 +328,38 @@ parse_subscription_options(ParseState *pstate, List 
*stmt_options,
                        opts->specified_opts |= SUBOPT_LSN;
                        opts->lsn = lsn;
                }
+               else if (IsSet(supported_opts, SUBOPT_RELID) &&
+                                strcmp(defel->defname, "relid") == 0)
+               {
+                       Oid                     relid = defGetObjectId(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_RELID))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (!OidIsValid(relid))
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation 
identifier used")));
+
+                       opts->specified_opts |= SUBOPT_RELID;
+                       opts->relid = relid;
+               }
+               else if (IsSet(supported_opts, SUBOPT_STATE) &&
+                                strcmp(defel->defname, "state") == 0)
+               {
+                       char       *state_str = defGetString(defel);
+
+                       if (IsSet(opts->specified_opts, SUBOPT_STATE))
+                               errorConflictingDefElem(defel, pstate);
+
+                       if (strlen(state_str) != 1)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("invalid relation state 
used")));
+
+                       opts->specified_opts |= SUBOPT_STATE;
+                       opts->state = defGetString(defel)[0];
+               }
                else
                        ereport(ERROR,
                                        (errcode(ERRCODE_SYNTAX_ERROR),
@@ -1341,6 +1377,27 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                break;
                        }
 
+               case ALTER_SUBSCRIPTION_ADD_TABLE:
+                       {
+                               if (!IsBinaryUpgrade)
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_SYNTAX_ERROR)),
+                                                       errmsg("ALTER 
SUBSCRIPTION ... ADD TABLE is not supported"));
+
+                               supported_opts = SUBOPT_RELID | SUBOPT_STATE | 
SUBOPT_LSN;
+                               parse_subscription_options(pstate, 
stmt->options,
+                                                                               
   supported_opts, &opts);
+
+                               /* relid and state should always be provided. */
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_RELID));
+                               Assert(IsSet(opts.specified_opts, 
SUBOPT_STATE));
+
+                               AddSubscriptionRelState(subid, opts.relid, 
opts.state,
+                                                                               
opts.lsn);
+
+                               break;
+                       }
+
                default:
                        elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d",
                                 stmt->kind);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a0138382a1..0a3448c487 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10670,6 +10670,17 @@ AlterSubscriptionStmt:
                                        n->options = $5;
                                        $$ = (Node *) n;
                                }
+                       /* for binary upgrade only */
+                       | ALTER SUBSCRIPTION name ADD_P TABLE definition
+                               {
+                                       AlterSubscriptionStmt *n =
+                                               makeNode(AlterSubscriptionStmt);
+
+                                       n->kind = ALTER_SUBSCRIPTION_ADD_TABLE;
+                                       n->subname = $3;
+                                       n->options = $6;
+                                       $$ = (Node *) n;
+                               }
                ;
 
 /*****************************************************************************
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 527c7651ab..61f54ee549 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4470,6 +4470,69 @@ is_superuser(Archive *fout)
        return false;
 }
 
+/*
+ * getSubscriptionRels
+ *       get information about the given subscription's relations
+ */
+static SubRelInfo *
+getSubscriptionRels(Archive *fout, Oid subid, int *nrels)
+{
+       SubRelInfo *rels;
+       PQExpBuffer query;
+       PGresult   *res;
+       int                     i_srrelid;
+       int                     i_srsubstate;
+       int                     i_srsublsn;
+       int                     i,
+                               ntups;
+
+       if (!fout->dopt->binary_upgrade)
+       {
+               *nrels = 0;
+
+               return NULL;
+       }
+
+       query = createPQExpBuffer();
+
+       appendPQExpBuffer(query, "SELECT srrelid, srsubstate, srsublsn "
+                                                               " FROM 
pg_subscription_rel"
+                                                               " WHERE srsubid 
= %u", subid);
+
+       res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+       ntups = PQntuples(res);
+       *nrels = ntups;
+
+       if (ntups == 0)
+       {
+               rels = NULL;
+               goto cleanup;
+       }
+
+       /*
+        * Get subscription relation fields.
+        */
+       i_srrelid = PQfnumber(res, "srrelid");
+       i_srsubstate = PQfnumber(res, "srsubstate");
+       i_srsublsn = PQfnumber(res, "srsublsn");
+
+       rels = pg_malloc(ntups * sizeof(SubRelInfo));
+
+       for (i = 0; i < ntups; i++)
+       {
+               rels[i].srrelid = atooid(PQgetvalue(res, i, i_srrelid));
+               rels[i].srsubstate = PQgetvalue(res, i, i_srsubstate)[0];
+               rels[i].srsublsn = pg_strdup(PQgetvalue(res, i, i_srsublsn));
+       }
+
+cleanup:
+       PQclear(res);
+       destroyPQExpBuffer(query);
+
+       return rels;
+}
+
 /*
  * getSubscriptions
  *       get information about subscriptions
@@ -4607,6 +4670,10 @@ getSubscriptions(Archive *fout)
                        pg_strdup(PQgetvalue(res, i, i_subdisableonerr));
                subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, 
i_suborigin));
 
+               subinfo[i].subrels = getSubscriptionRels(fout,
+                                                                               
                 subinfo[i].dobj.catId.oid,
+                                                                               
                 &subinfo[i].nrels);
+
                /* Decide whether we want to dump it */
                selectDumpableObject(&(subinfo[i].dobj), fout);
        }
@@ -4690,6 +4757,22 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
        appendPQExpBufferStr(query, ");\n");
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+       {
+               for (i = 0; i < subinfo->nrels; i++)
+               {
+                       appendPQExpBuffer(query, "\nALTER SUBSCRIPTION %s ADD 
TABLE "
+                                                                        
"(RELID = %u, STATE = '%c'",
+                                                                        
qsubname,
+                                                                        
subinfo->subrels[i].srrelid,
+                                                                        
subinfo->subrels[i].srsubstate);
+
+                       if (subinfo->subrels[i].srsublsn[0] != '\0')
+                               appendPQExpBuffer(query, ", LSN = '%s'",
+                                                                 
subinfo->subrels[i].srsublsn);
+
+                       appendPQExpBufferStr(query, ");");
+               }
+
                ArchiveEntry(fout, subinfo->dobj.catId, subinfo->dobj.dumpId,
                                         ARCHIVE_OPTS(.tag = subinfo->dobj.name,
                                                                  .owner = 
subinfo->rolname,
@@ -4697,6 +4780,7 @@ dumpSubscription(Archive *fout, const SubscriptionInfo 
*subinfo)
                                                                  .section = 
SECTION_POST_DATA,
                                                                  .createStmt = 
query->data,
                                                                  .dropStmt = 
delq->data));
+       }
 
        if (subinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
                dumpComment(fout, "SUBSCRIPTION", qsubname,
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index e7cbd8d7ed..03fb0dafe0 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -646,6 +646,16 @@ typedef struct _PublicationSchemaInfo
        NamespaceInfo *pubschema;
 } PublicationSchemaInfo;
 
+/*
+ * The SubRelInfo struct is used to represent subscription relation.
+ */
+typedef struct _SubRelInfo
+{
+       Oid             srrelid;
+       char    srsubstate;
+       char   *srsublsn;
+} SubRelInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -662,6 +672,8 @@ typedef struct _SubscriptionInfo
        char       *suborigin;
        char       *subsynccommit;
        char       *subpublications;
+       int                     nrels;
+       SubRelInfo *subrels;
 } SubscriptionInfo;
 
 /*
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f7d7f10f7d..8f66307287 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3917,7 +3917,8 @@ typedef enum AlterSubscriptionType
        ALTER_SUBSCRIPTION_DROP_PUBLICATION,
        ALTER_SUBSCRIPTION_REFRESH,
        ALTER_SUBSCRIPTION_ENABLED,
-       ALTER_SUBSCRIPTION_SKIP
+       ALTER_SUBSCRIPTION_SKIP,
+       ALTER_SUBSCRIPTION_ADD_TABLE
 } AlterSubscriptionType;
 
 typedef struct AlterSubscriptionStmt
-- 
2.37.0

Reply via email to