Re: Add an option to skip loading missing publication to avoid logical replication failure
On Mon, 19 Feb 2024 at 12:48, vignesh C wrote: > > Hi, > > Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the > logical replication in certain cases. This can happen as the apply > worker will get restarted after SET PUBLICATION, the apply worker will > use the existing slot and replication origin corresponding to the > subscription. Now, it is possible that before restart the origin has > not been updated and the WAL start location points to a location prior > to where PUBLICATION pub exists which can lead to such an error. Once > this error occurs, apply worker will never be able to proceed and will > always return the same error. > > There was discussion on this and Amit had posted a patch to handle > this at [2]. Amit's patch does continue using a historic snapshot but > ignores publications that are not found for the purpose of computing > RelSyncEntry attributes. We won't mark such an entry as valid till all > the publications are loaded without anything missing. This means we > won't publish operations on tables corresponding to that publication > till we found such a publication and that seems okay. > I have added an option skip_not_exist_publication to enable this > operation only when skip_not_exist_publication is specified as true. > There is no change in default behavior when skip_not_exist_publication > is specified as false. I have updated the patch to now include changes for pg_dump, added few tests, describe changes and added documentation changes. The attached v2 version patch has the changes for the same. Regards, Vignesh From c0f97fc671db390239ca5cb4c224cb3d80a0e22e Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 19 Feb 2024 10:20:02 +0530 Subject: [PATCH v2 1/2] Skip loading the publication if the publication does not exist. Skip loading the publication if the publication does not exist. --- src/backend/replication/pgoutput/pgoutput.c | 28 +++-- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 998f92d671..f7b6d0384d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + *skipped = true; } return result; @@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* Validate the entry */ - if (!entry->replicate_valid) + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. */ if (!publications_valid) @@ -2021,9 +2029,15 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) list_free_deep(data->publications); data->publications = NIL; } - data->publications = LoadPublications(data->publication_names); + data->publications = LoadPublications(data->publication_names, &skipped_pub); MemoryContextSwitchTo(oldctx); - publications_valid = true; + + /* + * We don't consider the publications to be valid till we have + * information of all the publications. + */ + if (!skipped_pub) +publications_valid = true; } /* -- 2.34.1 From e9ff5304b2a3a0fccfb9084ff076a0465f28edbe Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 19 Feb 2024 10:23:05 +0530 Subject: [PATCH v2 2/2] Added an option skip_not_exist_publication which will skip loading the publication, if the publication does not exist. Added an option skip_not_exist_publication which will skip
Add an option to skip loading missing publication to avoid logical replication failure
Hi, Currently ALTER SUBSCRIPTION ... SET PUBLICATION will break the logical replication in certain cases. This can happen as the apply worker will get restarted after SET PUBLICATION, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before restart the origin has not been updated and the WAL start location points to a location prior to where PUBLICATION pub exists which can lead to such an error. Once this error occurs, apply worker will never be able to proceed and will always return the same error. There was discussion on this and Amit had posted a patch to handle this at [2]. Amit's patch does continue using a historic snapshot but ignores publications that are not found for the purpose of computing RelSyncEntry attributes. We won't mark such an entry as valid till all the publications are loaded without anything missing. This means we won't publish operations on tables corresponding to that publication till we found such a publication and that seems okay. I have added an option skip_not_exist_publication to enable this operation only when skip_not_exist_publication is specified as true. There is no change in default behavior when skip_not_exist_publication is specified as false. But one thing to note with the patch (with skip_not_exist_publication option) is that replication of few WAL entries will be skipped till the publication is loaded like in the below example: -- Create table in publisher and subscriber create table t1(c1 int); create table t2(c1 int); -- Create publications create publication pub1 for table t1; create publication pub2 for table t2; -- Create subscription create subscription test1 connection 'dbname=postgres host=localhost port=5432' publication pub1, pub2; -- Drop one publication drop publication pub1; -- Insert in the publisher insert into t1 values(11); insert into t2 values(21); -- Select in subscriber postgres=# select * from t1; c1 (0 rows) postgres=# select * from t2; c1 21 (1 row) -- Create the dropped publication in publisher create publication pub1 for table t1; -- Insert in the publisher insert into t1 values(12); postgres=# select * from t1; c1 11 12 (2 rows) -- Select data in subscriber postgres=# select * from t1; -- record with value 11 will be missing in subscriber c1 12 (1 row) Thoughts? [1] - https://www.postgresql.org/message-id/CAA4eK1%2BT-ETXeRM4DHWzGxBpKafLCp__5bPA_QZfFQp7-0wj4Q%40mail.gmail.com Regards, Vignesh From 9b79dee26554ae4af9ff00948ab185482b071ba8 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Mon, 19 Feb 2024 10:20:02 +0530 Subject: [PATCH v1 1/2] Skip loading the publication if the publication does not exist. Skip loading the publication if the publication does not exist. --- src/backend/replication/pgoutput/pgoutput.c | 28 +++-- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 998f92d671..f7b6d0384d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -82,7 +82,7 @@ static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, static bool publications_valid; -static List *LoadPublications(List *pubnames); +static List *LoadPublications(List *pubnames, bool *skipped); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, @@ -1703,9 +1703,13 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we just skip the publications that don't exist yet. 'skipped' + * will be true if we find any publication from the given list that doesn't + * exist. */ static List * -LoadPublications(List *pubnames) +LoadPublications(List *pubnames, bool *skipped) { List *result = NIL; ListCell *lc; @@ -1713,9 +1717,12 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + *skipped = true; } return result; @@ -1994,7 +2001,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } /* Validate the entry */ - if (!entry->replicate_valid) + if (!entry->replicate_valid || !publications_valid) { Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); @@ -2011,6 +2018,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); List *rel_publications = NIL; + bool skipped_pub = false; /* Reload publications if needed before use. *