On 13/06/17 21:49, Peter Eisentraut wrote: > On 6/13/17 02:33, Noah Misch wrote: >>> Steps to reproduce - >>> X cluster -> create 100 tables , publish all tables (create publication pub >>> for all tables); >>> Y Cluster -> create 100 tables ,create subscription(create subscription sub >>> connection 'user=centos host=localhost' publication pub; >>> Y cluster ->drop subscription - drop subscription sub; >>> >>> check the log file on Y cluster. >>> >>> Sometime , i have seen this error on psql prompt and drop subscription >>> operation got failed at first attempt. >>> >>> postgres=# drop subscription sub; >>> ERROR: tuple concurrently updated >>> postgres=# drop subscription sub; >>> NOTICE: dropped replication slot "sub" on publisher >>> DROP SUBSCRIPTION >> >> [Action required within three days. This is a generic notification.] > > It's being worked on. Let's see by Thursday. >
Attached fixes it (it was mostly about order of calls). I also split the SetSubscriptionRelState into 2 separate interface while I was changing it, because now that the update_only bool was added it has become quite strange to have single interface for what is basically two separate functions. There are still couple of remaining issues from this thread though. Namely the AccessExclusiveLock of the pg_subscription catalog which is not very pretty, but we need a way to block launcher from accessing the subscription which is being dropped and make sure it will not start new workers for it afterwards. Question is how however as by the time launcher can lock individual subscription it is already processing it. So it looks to me like we'd need to reread the catalog with new snapshot after the lock was acquired which seems bit wasteful (I wonder if we could just AcceptInvalidationMessages and refetch from syscache). Any better ideas? Other related problem is locking of subscriptions during operations on them, especially AlterSubscription seems like it should lock the subscription itself. I did that in 0002. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 005eeb820f4e0528513744136582c4489e2429e3 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Wed, 14 Jun 2017 08:14:20 +0200 Subject: [PATCH 2/2] Lock subscription when altering it --- src/backend/commands/subscriptioncmds.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 14c8f3f..e0ec8ea 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -642,6 +642,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) stmt->subname); subid = HeapTupleGetOid(tup); + + /* + * Lock the subscription so nobody else can do anything with it (including + * the replication workers). + */ + LockSharedObject(SubscriptionRelationId, subid, 0, AccessExclusiveLock); + sub = GetSubscription(subid, false); /* Form a new tuple. */ -- 2.7.4
From 9011698ae800e0f45f960e91f6b16eab15634fac Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Tue, 13 Jun 2017 19:26:51 +0200 Subject: [PATCH 1/2] Improve the pg_subscription_rel handling Split the SetSubscriptionRelState into separate Add and Update functions, removing the unsafe upsert logic as callers are supposed to know whether they are updating or adding new row. Reorder the code in the above mentioned functions to avoid "tuple updated concurrently" warnings. --- src/backend/catalog/pg_subscription.c | 131 +++++++++++++++------------- src/backend/commands/subscriptioncmds.c | 14 +-- src/backend/replication/logical/tablesync.c | 33 ++++--- src/include/catalog/pg_subscription_rel.h | 6 +- 4 files changed, 98 insertions(+), 86 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index c5b2541..fd19675 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -225,24 +225,15 @@ textarray_to_stringlist(ArrayType *textarray) } /* - * Set the state of a subscription table. - * - * If update_only is true and the record for given table doesn't exist, do - * nothing. This can be used to avoid inserting a new record that was deleted - * by someone else. Generally, subscription DDL commands should use false, - * workers should use true. - * - * The insert-or-update logic in this function is not concurrency safe so it - * might raise an error in rare circumstances. But if we took a stronger lock - * such as ShareRowExclusiveLock, we would risk more deadlocks. + * Add new state record for a subscription table. */ Oid -SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only) +AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) { Relation rel; HeapTuple tup; - Oid subrelid = InvalidOid; + Oid subrelid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; @@ -252,57 +243,79 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(subid)); + if (HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u already exists", + relid, subid); - /* - * If the record for given table does not exist yet create new record, - * otherwise update the existing one. - */ - if (!HeapTupleIsValid(tup) && !update_only) - { - /* Form the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); - values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - - tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - - /* Insert tuple into catalog. */ - subrelid = CatalogTupleInsert(rel, tup); - - heap_freetuple(tup); - } - else if (HeapTupleIsValid(tup)) - { - bool replaces[Natts_pg_subscription_rel]; + /* Form the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; - /* Update the tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replaces, false, sizeof(replaces)); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); - replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; - values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + /* Insert tuple into catalog. */ + subrelid = CatalogTupleInsert(rel, tup); - replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; - if (sublsn != InvalidXLogRecPtr) - values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); - else - nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + heap_freetuple(tup); - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Cleanup. */ + heap_close(rel, NoLock); - /* Update the catalog. */ - CatalogTupleUpdate(rel, &tup->t_self, tup); + return subrelid; +} - subrelid = HeapTupleGetOid(tup); - } +/* + * Update the state of a subscription table. + */ +Oid +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + HeapTuple tup; + Oid subrelid; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + bool replaces[Natts_pg_subscription_rel]; + + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription table %u in subscription %u does not exist", + relid, subid); + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + subrelid = HeapTupleGetOid(tup); /* Cleanup. */ heap_close(rel, NoLock); @@ -377,6 +390,8 @@ RemoveSubscriptionRel(Oid subid, Oid relid) HeapTuple tup; int nkeys = 0; + Assert(OidIsValid(subid) || OidIsValid(relid)); + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); if (OidIsValid(subid)) @@ -400,9 +415,7 @@ RemoveSubscriptionRel(Oid subid, Oid relid) /* Do the search and delete what we found. */ scan = heap_beginscan_catalog(rel, nkeys, skey); while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) - { simple_heap_delete(rel, &tup->t_self); - } heap_endscan(scan); heap_close(rel, RowExclusiveLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5aae7b6..14c8f3f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -450,8 +450,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) CheckSubscriptionRelkind(get_rel_relkind(relid), rv->schemaname, rv->relname); - SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr, false); + AddSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); } /* @@ -569,9 +569,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) if (!bsearch(&relid, subrel_local_oids, list_length(subrel_states), sizeof(Oid), oid_cmp)) { - SetSubscriptionRelState(sub->oid, relid, + AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr, false); + InvalidXLogRecPtr); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), @@ -906,15 +906,15 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ReleaseSysCache(tup); + /* Kill the apply worker associated with the subscription. */ + logicalrep_worker_stop(subid, InvalidOid); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3ff08bf..28accda 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -285,11 +285,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SpinLockRelease(&MyLogicalRepWorker->relmutex); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -414,9 +413,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) StartTransactionCommand(); started_tx = true; } - SetSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn, true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); } } else @@ -844,11 +843,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) /* Update the state and make it visible to others. */ StartTransactionCommand(); - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relstate, + MyLogicalRepWorker->relstate_lsn); CommitTransactionCommand(); pgstat_report_stat(false); @@ -933,11 +931,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) * Update the new state in catalog. No need to bother * with the shmem state as we are exiting for good. */ - SetSubscriptionRelState(MyLogicalRepWorker->subid, - MyLogicalRepWorker->relid, - SUBREL_STATE_SYNCDONE, - *origin_startpos, - true); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_SYNCDONE, + *origin_startpos); finish_sync_worker(); } break; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index f5f6191..e6a2dd5 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -70,8 +70,10 @@ typedef struct SubscriptionRelState char state; } SubscriptionRelState; -extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool update_only); +extern Oid AddSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); +extern Oid UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers