On Tue, Oct 21, 2025 at 8:11 PM vignesh C <[email protected]> wrote:
>
> On Tue, 21 Oct 2025 at 03:49, Masahiko Sawada <[email protected]> wrote:
>
> > ---
> >  /*
> > - * Check and log a warning if the publisher has subscribed to the same 
> > table,
> > - * its partition ancestors (if it's a partition), or its partition 
> > children (if
> > - * it's a partitioned table), from some other publishers. This check is
> > - * required in the following scenarios:
> > + * Check and log a warning if the publisher has subscribed to the same 
> > relation
> > + * (table or sequence), its partition ancestors (if it's a partition), or 
> > its
> > + * partition children (if it's a partitioned table), from some other
> > publishers.
> > + * This check is required in the following scenarios:
> >   *
> >   * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH 
> > PUBLICATION
> >   *    statements with "copy_data = true" and "origin = none":
> >   *    - Warn the user that data with an origin might have been copied.
> > - *    - This check is skipped for tables already added, as incremental 
> > sync via
> > - *      WAL allows origin tracking. The list of such tables is in
> > - *      subrel_local_oids.
> > + *    - This check is skipped for tables and sequences already added, as
> > + *      incremental sync via WAL allows origin tracking. The list of
> > such tables
> > + *      is in subrel_local_oids.
> >   *
> >   * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH 
> > PUBLICATION
> >   *    statements with "retain_dead_tuples = true" and "origin = any", and 
> > for
> > @@ -2338,13 +2440,19 @@ AlterSubscriptionOwner_oid(Oid subid, Oid 
> > newOwnerId)
> >   *    - Warn the user that only conflict detection info for local changes 
> > on
> >   *      the publisher is retained. Data from other origins may lack 
> > sufficient
> >   *      details for reliable conflict detection.
> > + *    - This check targets for tables only.
> >   *    - See comments atop worker.c for more details.
> > + *
> > + * 3) For ALTER SUBSCRIPTION ... REFRESH SEQUENCE statements with "origin =
> > + *    none":
> > + *    - Warn the user that sequence data from another origin might have 
> > been
> > + *      copied.
> >   */
> >
> > While this function is well documented, I find it's quite complex, and
> > this patch adds to that complexity. The function has 9 arguments,
> > making it difficult to understand which combinations of arguments
> > enable which checks. For example, the function header comment doesn't
> > explain when to use the only_sequences parameter. At first, I thought
> > only_sequences should be set to true when checking if the publisher
> > has subscribed to sequences from other publishers, but looking at the
> > code, I discovered it doesn't check sequences when check_rdt is true:
> >
> > +   if (walrcv_server_version(wrconn) < 190000 || check_rdt)
> > +       appendStringInfo(&cmd, query,
> > +                        "(SELECT relid, TRUE as istable FROM
> > pg_get_publication_tables(P.pubname))");
> > +   else if (only_sequences)
> > +       appendStringInfo(&cmd, query,
> > +                        "(SELECT relid, FALSE as istable FROM
> > pg_get_publication_sequences(P.pubname))");
> > +   else
> > +       appendStringInfo(&cmd, query,
> > +                        "(SELECT relid, TRUE as istable FROM
> > pg_get_publication_tables(P.pubname) UNION ALL"
> > +                        " SELECT relid, FALSE as istable FROM
> > pg_get_publication_sequences(P.pubname))");
> > +
> >
> > I find that the complexity might stem from checking different cases in
> > one function,  but I don't have better ideas to improve the logic for
> > now. I think we can at least describe what the caller can expect from
> > specifying only_sequence to true.
>
> Split this function into check_publications_origin_sequences and
> check_publications_origin_tables to reduce the complexity. After this
> change we log two warnings if both tables and sequences are subscriber
> to the same tables and sequences like:
>

I think the case where both WARNINGs will be displayed is rare so it
should be okay as it simplifies the code quite a bit. Another thing is
we need to query twice but as this happens during DDL and only for
very specific cases that should also be okay. We can anyway merge
these later if we see any problem with it but for now it would be
better to prefer code simplicity.

When check_publications_origin_sequences() is called from Alter
Subscription ... Refresh Publication ... or Create Subscription ...
code path then shouldn't we check copy_data as well along with origin
as none? Because, if copy_data is false, we should have added a
sequence in the READY state, so we don't need to fetch its values.

I have added a few comments in this new function and made a number of
other cosmetic improvements in the attached.

-- 
With Regards,
Amit Kapila.
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 73b068dd31c..267b898dca1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -113,18 +113,18 @@ typedef struct PublicationRelKind
 } PublicationRelKind;
 
 static List *fetch_relation_list(WalReceiverConn *wrconn, List *publications);
-static void check_publications_origin_sequences(WalReceiverConn *wrconn,
-                                                                               
                List *publications,
-                                                                               
                char *origin,
-                                                                               
                Oid *subrel_local_oids,
-                                                                               
                int subrel_count,
-                                                                               
                char *subname);
 static void check_publications_origin_tables(WalReceiverConn *wrconn,
                                                                                
         List *publications, bool copydata,
                                                                                
         bool retain_dead_tuples,
                                                                                
         char *origin,
                                                                                
         Oid *subrel_local_oids,
                                                                                
         int subrel_count, char *subname);
+static void check_publications_origin_sequences(WalReceiverConn* wrconn,
+                                                                               
                List *publications,
+                                                                               
                char *origin,
+                                                                               
                Oid *subrel_local_oids,
+                                                                               
                int subrel_count,
+                                                                               
                char *subname);
 static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool 
addpub, const char *subname);
@@ -979,9 +979,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                        else
                                subrel_local_oids[tbl_count++] = 
relstate->relid;
                }
-               qsort(subrel_local_oids, tbl_count,
-                         sizeof(Oid), oid_cmp);
 
+               qsort(subrel_local_oids, tbl_count, sizeof(Oid), oid_cmp);
                check_publications_origin_tables(wrconn, sub->publications, 
copy_data,
                                                                                
 sub->retaindeadtuples, sub->origin,
                                                                                
 subrel_local_oids, tbl_count,
@@ -2461,106 +2460,6 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
        table_close(rel, RowExclusiveLock);
 }
 
-static void
-check_publications_origin_sequences(WalReceiverConn *wrconn, List 
*publications,
-                                                                       char 
*origin, Oid *subrel_local_oids,
-                                                                       int 
subrel_count, char *subname)
-{
-       WalRcvExecResult *res;
-       StringInfoData cmd;
-       TupleTableSlot *slot;
-       int                     i;
-       Oid                     tableRow[1] = {TEXTOID};
-       List       *publist = NIL;
-
-       /* Enable sequence synchronization checks only when origin is 'none' */
-       if (!origin || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
-               return;
-
-       initStringInfo(&cmd);
-       appendStringInfoString(&cmd,
-                                                  "SELECT DISTINCT P.pubname 
AS pubname\n"
-                                                  "FROM pg_publication P,\n"
-                                                  "     LATERAL 
pg_get_publication_sequences(P.pubname) GPR\n"
-                                                  "     JOIN 
pg_subscription_rel PS ON (GPR.relid = PS.srrelid),\n"
-                                                  "     pg_class C JOIN 
pg_namespace N ON (N.oid = C.relnamespace)\n"
-                                                  "WHERE C.oid = GPR.relid "
-                                                  "     AND P.pubname IN (");
-
-       GetPublicationsStr(publications, &cmd, true);
-       appendStringInfoString(&cmd, ")\n");
-
-       /*
-        * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
-        * subrel_local_oids contains the list of relations that are already
-        * present on the subscriber. This check should be skipped as these will
-        * not be re-synced.
-        */
-       for (i = 0; i < subrel_count; i++)
-       {
-               Oid                     relid = subrel_local_oids[i];
-               char       *schemaname = 
get_namespace_name(get_rel_namespace(relid));
-               char       *tablename = get_rel_name(relid);
-
-               appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname 
= '%s')\n",
-                                                       schemaname, tablename);
-       }
-
-       res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
-       pfree(cmd.data);
-
-       if (res->status != WALRCV_OK_TUPLES)
-               ereport(ERROR,
-                               (errcode(ERRCODE_CONNECTION_FAILURE),
-                                errmsg("could not receive list of replicated 
relations from the publisher: %s",
-                                               res->err)));
-
-       /* Process relations. */
-       slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
-       while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
-       {
-               char       *pubname;
-               bool            isnull;
-
-               pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
-               Assert(!isnull);
-
-               ExecClearTuple(slot);
-               publist = list_append_unique(publist, makeString(pubname));
-       }
-
-       /*
-        * Log a warning if the publisher has subscribed to the same sequence 
from
-        * some other publisher. We cannot know the origin of sequences data 
during
-        * the initial sync.
-        */
-       if (publist)
-       {
-               StringInfo      pubnames = makeStringInfo();
-               StringInfo      err_msg = makeStringInfo();
-               StringInfo      err_hint = makeStringInfo();
-
-               /* Prepare the list of publication(s) for warning message. */
-               GetPublicationsStr(publist, pubnames, false);
-
-               appendStringInfo(err_msg, _("subscription \"%s\" requested 
copy_data with origin = NONE but might copy data that had a different origin"),
-                                                subname);
-               appendStringInfoString(err_hint, _("Verify that initial data 
copied from the publisher sequences did not come from other origins."));
-
-               ereport(WARNING,
-                               
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                               errmsg_internal("%s", err_msg->data),
-                               errdetail_plural("The subscription subscribes 
to a publication (%s) that contains sequences that are written to by other 
subscriptions.",
-                                                                "The 
subscription subscribes to publications (%s) that contain sequences that are 
written to by other subscriptions.",
-                                                                
list_length(publist), pubnames->data),
-                               errhint_internal("%s", err_hint->data));
-       }
-
-       ExecDropSingleTupleTableSlot(slot);
-
-       walrcv_clear_result(res);
-}
-
 /*
  * Check and log a warning if the publisher has subscribed to the same table,
  * its partition ancestors (if it's a partition), or its partition children (if
@@ -2726,6 +2625,116 @@ check_publications_origin_tables(WalReceiverConn 
*wrconn, List *publications,
        walrcv_clear_result(res);
 }
 
+/*
+ * This function is similar to check_publications_origin_tables and serves
+ * same purpose for sequences.
+ *
+ * In addition to the checks where check_publications_origin_tables is used,
+ * this function is also used for ALTER SUBSCRIPTION ... REFRESH SEQUENCES.
+ */
+static void
+check_publications_origin_sequences(WalReceiverConn* wrconn, List* 
publications,
+                                                                       char 
*origin, Oid *subrel_local_oids,
+                                                                       int 
subrel_count, char *subname)
+{
+       WalRcvExecResult *res;
+       StringInfoData cmd;
+       TupleTableSlot *slot;
+       int                     i;
+       Oid                     tableRow[1] = { TEXTOID };
+       List *publist = NIL;
+
+       /*
+        * Enable sequence synchronization checks only when origin is 'none', to
+        * ensure that sequence data from other origins is not inadvertently
+        * copied.
+        */
+       if (!origin || pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)
+               return;
+
+       initStringInfo(&cmd);
+       appendStringInfoString(&cmd,
+                                                  "SELECT DISTINCT P.pubname 
AS pubname\n"
+                                                  "FROM pg_publication P,\n"
+                                                  "     LATERAL 
pg_get_publication_sequences(P.pubname) GPR\n"
+                                                  "     JOIN 
pg_subscription_rel PS ON (GPR.relid = PS.srrelid),\n"
+                                                  "     pg_class C JOIN 
pg_namespace N ON (N.oid = C.relnamespace)\n"
+                                                  "WHERE C.oid = GPR.relid AND 
P.pubname IN (");
+
+       GetPublicationsStr(publications, &cmd, true);
+       appendStringInfoString(&cmd, ")\n");
+
+       /*
+        * In case of ALTER SUBSCRIPTION ... REFRESH PUBLICATION,
+        * subrel_local_oids contains the list of relations that are already
+        * present on the subscriber. This check should be skipped as these will
+        * not be re-synced.
+        */
+       for (i = 0; i < subrel_count; i++)
+       {
+               Oid                     relid = subrel_local_oids[i];
+               char            *schemaname = 
get_namespace_name(get_rel_namespace(relid));
+               char            *seqname = get_rel_name(relid);
+
+               appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname 
= '%s')\n",
+                                                schemaname, seqname);
+       }
+
+       res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+       pfree(cmd.data);
+
+       if (res->status != WALRCV_OK_TUPLES)
+               ereport(ERROR,
+                               (errcode(ERRCODE_CONNECTION_FAILURE),
+                                errmsg("could not receive list of replicated 
relations from the publisher: %s",
+                                               res->err)));
+
+       /* Process relations. */
+       slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+       while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+       {
+               char* pubname;
+               bool            isnull;
+
+               pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+               Assert(!isnull);
+
+               ExecClearTuple(slot);
+               publist = list_append_unique(publist, makeString(pubname));
+       }
+
+       /*
+        * Log a warning if the publisher has subscribed to the same sequence 
from
+        * some other publisher. We cannot know the origin of sequences data 
during
+        * the initial sync.
+        */
+       if (publist)
+       {
+               StringInfo      pubnames = makeStringInfo();
+               StringInfo      err_msg = makeStringInfo();
+               StringInfo      err_hint = makeStringInfo();
+
+               /* Prepare the list of publication(s) for warning message. */
+               GetPublicationsStr(publist, pubnames, false);
+
+               appendStringInfo(err_msg, _("subscription \"%s\" requested 
copy_data with origin = NONE but might copy data that had a different origin"),
+                                                subname);
+               appendStringInfoString(err_hint, _("Verify that initial data 
copied from the publisher sequences did not come from other origins."));
+
+               ereport(WARNING,
+                               
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg_internal("%s", err_msg->data),
+                               errdetail_plural("The subscription subscribes 
to a publication (%s) that contains sequences that are written to by other 
subscriptions.",
+                                                                "The 
subscription subscribes to publications (%s) that contain sequences that are 
written to by other subscriptions.",
+                                                                
list_length(publist), pubnames->data),
+                               errhint_internal("%s", err_hint->data));
+       }
+
+       ExecDropSingleTupleTableSlot(slot);
+
+       walrcv_clear_result(res);
+}
+
 /*
  * Determine whether the retain_dead_tuples can be enabled based on the
  * publisher's status.

Reply via email to