On Mon, Jun 26, 2023 at 8:35 PM Tomas Vondra <tomas.von...@enterprisedb.com> wrote: > On 6/26/23 15:18, Ashutosh Bapat wrote:
> > I will look at 0004 next. > > > > OK 0004- is quite large. I think if we split this into two or even three 1. publication and subscription catalog handling 2. built-in replication protocol changes, it might be easier to review. But anyway, I have given it one read. I have reviewed the parts which deal with the replication-proper in detail. I have *not* thoroughly reviewed the parts which deal with the catalogs, pg_dump, describe and tab completion. Similarly tests. If those parts need a thorough review, please let me know. But before jumping into the comments, a weird scenario I tried. On publisher I created a table t1(a int, b int) and a sequence s and added both to a publication. On subscriber I swapped their names i.e. created a table s(a int, b int) and a sequence t1 and subscribed to the publication. The subscription was created, and during replication it threw error "logical replication target relation "public.t1" is missing replicated columns: "a", "b" and logical replication target relation "public.s" is missing replicated columns: "last_value", "lo g_cnt", "is_called". I think it's good that it at least threw an error. But it would be good if it detected that the reltypes themselves are different and mentioned that in the error. Something like "logical replication target "public.s" is not a sequence like source "public.s". Comments on the patch itself. I didn't find any mention of 'sequence' in the documentation of publish option in CREATE or ALTER PUBLICATION. Something missing in the documentation? But do we really need to record "sequence" as an operation? Just adding the sequences to the publication should be fine right? There's only one operation on sequences, updating the sequence row. +CREATE VIEW pg_publication_sequences AS + SELECT + P.pubname AS pubname, + N.nspname AS schemaname, + C.relname AS sequencename If we report oid or regclass for sequences it might be easier to join the view further. We don't have reg* for publication so we report both oid and name of publication. +/* + * Update the sequence state by modifying the existing sequence data row. + * + * This keeps the same relfilenode, so the behavior is non-transactional. + */ +static void +SetSequence_non_transactional(Oid seqrelid, int64 last_value, int64 log_cnt, bool is_called) This function has some code similar to nextval but with the sequence of operations (viz. changes to buffer, WAL insert and cache update) changed. Given the comments in nextval_internal() the difference in sequence of operations should not make a difference in the end result. But I think it will be good to deduplicate the code to avoid confusion and also for ease of maintenance. + +/* + * Update the sequence state by creating a new relfilenode. + * + * This creates a new relfilenode, to allow transactional behavior. + */ +static void +SetSequence_transactional(Oid seq_relid, int64 last_value, int64 log_cnt, bool is_called) Need some deduplication here as well. But the similarities with AlterSequence, ResetSequence or DefineSequence are less. @@ -730,9 +731,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { /* - * Get the table list from publisher and build local table status - * info. + * Get the table and sequence list from publisher and build + * local relation sync status info. */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + relations = fetch_table_list(wrconn, publications); Is it allowed to connect a newer subscriber to an old publisher? If yes the query to fetch sequences will throw an error since it won't find the catalog. @@ -882,8 +886,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, - /* Get the table list from publisher. */ + /* Get the list of relations from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = list_concat(pubrel_names, + fetch_sequence_list(wrconn, sub->publications)); Similarly here. +void +logicalrep_write_sequence(StringInfo out, Relation rel, TransactionId xid, + ... snip ... + pq_sendint8(out, flags); + pq_sendint64(out, lsn); ... snip ... +LogicalRepRelId +logicalrep_read_sequence(StringInfo in, LogicalRepSequence *seqdata) +{ ... snip ... + /* XXX skipping flags and lsn */ + pq_getmsgint(in, 1); + pq_getmsgint64(in); We are ignoring these two fields on the WAL receiver side. I don't see such fields being part of INSERT, UPDATE or DELETE messages. Should we just drop those or do they have some future use? Two lsns are written by OutputPrepareWrite() as prologue to the logical message. If this LSN is one of them, it could be dropped anyway. +static void +fetch_sequence_data(char *nspname, char *relname, ... snip ... + appendStringInfo(&cmd, "SELECT last_value, log_cnt, is_called\n" + " FROM %s", quote_qualified_identifier(nspname, relname)); We are using an undocumented interface here. SELECT ... FROM <sequence> is not documented. This code will break if we change the way a sequence is stored. That is quite unlikely but not impossible. Ideally we should use one of the methods documented at [1]. But none of them provide us what is needed per your comment in copy_sequence() i.e the state of sequence as of last WAL record on that sequence. So I don't have any better ideas that what's done in the patch. May be we can use "nextval() + 32" as an approximation. Some minor comments and nitpicks: @@ -1958,12 +1958,14 @@ get_object_address_publication_schema(List *object, bool missing_ok) Need an update to the function prologue with the description of the third element. Also the error message at the end of the function needs to mention the object type. - appendStringInfo(&buffer, _("publication of schema %s in publication %s"), - nspname, pubname); + appendStringInfo(&buffer, _("publication of schema %s in publication %s type %s"), + nspname, pubname, objtype); s/type/for object type/ ? @@ -5826,18 +5842,24 @@ getObjectIdentityParts(const ObjectAddress *object, break; - appendStringInfo(&buffer, "%s in publication %s", - nspname, pubname); + appendStringInfo(&buffer, "%s in publication %s type %s", + nspname, pubname, objtype); s/type/object type/? ... in some other places as well? +/* + * Check the character is a valid object type for schema publication. + * + * This recognizes either 't' for tables or 's' for sequences. Places that + * need to handle 'u' for unsupported relkinds need to do that explicitlyl s/explicitlyl/explicitly/ +Datum +pg_get_publication_sequences(PG_FUNCTION_ARGS) +{ ... snip ... + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ Not relevant here. + if (publication->allsequences) + sequences = GetAllSequencesPublicationRelations(); + else + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemaPublicationRelations(publication->oid, + PUB_OBJTYPE_SEQUENCE, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); I think we should just pass PUBLICATION_PART_ALL since that parameter is irrelevant to sequences anyway. Otherwise this code would be confusing. I think we should rename PublicationTable structure to PublicationRelation since it can now contain information about a table or a sequence, both of which are relations. +/* + * Add or remove table to/from publication. s/table/sequence/. Generally this applies to all the code, working for tables, copied and modified for sequences. @@ -18826,6 +18867,30 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) errmsg("invalid schema name"), parser_errposition(pubobj->location)); } + else if (pubobj->pubobjtype == PUBLICATIONOBJ_SEQUENCES_IN_SCHEMA || + pubobj->pubobjtype == PUBLICATIONOBJ_SEQUENCES_IN_CUR_SCHEMA) + { + /* WHERE clause is not allowed on a schema object */ + if (pubobj->pubtable && pubobj->pubtable->whereClause) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("WHERE clause not allowed for schema"), + parser_errposition(pubobj->location)); Grammar doesn't allow specifying whereClause with ALL TABLES IN SCHEMA specification but we have code to throw error if that happens. We also have similar code for ALL SEQUENCES IN SCHEMA. Should we add for SEQUENCE specification as well? +static void +fetch_sequence_data(char *nspname, char *relname, ... snip ... + /* tablesync sets the sequences in non-transactional way */ + SetSequence(RelationGetRelid(rel), false, last_value, log_cnt, is_called); Why? In case of a regular table, in case the sync fails, the table will retain its state before sync. Similarly it will be expected that the sequence retains its state before sync, No? @@ -1467,10 +1557,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) Now that it syncs sequences as well, should we rename this as LogicalRepSyncRelationStart? +static void +apply_handle_sequence(StringInfo s) ... snip ... + /* + * Commit the per-stream transaction (we only do this when not in + * remote transaction, i.e. for non-transactional sequence updates.) + */ + if (!in_remote_transaction) + CommitTransactionCommand(); I understand the purpose of if block. It commits the transaction that was started when applying a non-transactional sequence change. But didn't understand the term "per-stream transaction". @@ -5683,8 +5686,15 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) Thanks for the additional comments. Those are useful. @@ -1716,28 +1716,19 @@ describeOneTableDetails(const char *schemaname, I think these changes make it easy to print the publication description per the code changes later. But May be we should commit the refactoring patch separately. -DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_index, 6239, PublicationNamespacePnnspidPnpubidIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_namespace_pnnspid_pnpubid_pntype_index, 8903, PublicationNamespacePnnspidPnpubidPntypeIndexId, on pg_publication_namespace using btree(pnnspid oid_ops, pnpubid oid_ops, pntype char_ops)); Why do we need a new OID? The old index should not be there in a cluster created using this version and hence this OID will not be used. [1] https://www.postgresql.org/docs/current/functions-sequence.html Next I will review 0005. -- Best Wishes, Ashutosh Bapat