On 17/12/16 18:34, Steve Singer wrote:
> On 12/16/2016 07:49 AM, Petr Jelinek wrote:
>> Hi,
>>
>> attached is version 13 of the patch.
>>
>> I merged in changes from PeterE. And did following changes:
>> - fixed the ownership error messages for both provider and subscriber
>> - added ability to send invalidation message to invalidate whole
>> relcache and use it in publication code
>> - added the post creation/alter/drop hooks
>> - removed parts of docs that refer to initial sync (which does not exist
>> yet)
>> - added timeout handling/retry, etc to apply/launcher based on the GUCs
>> that exist for wal receiver (they could use renaming though)
>> - improved feedback behavior
>> - apply worker now uses owner of the subscription as connection user
>> - more tests
>> - check for max_replication_slots in launcher
>> - clarify the update 'K' sub-message description in protocol
>
> A few things I've noticed so far
>
> If I shutdown the publisher I see the following in the log
>
> 2016-12-17 11:33:49.548 EST [1891] LOG: worker process: ?)G? (PID 1987)
> exited with exit code 1
>
> but then if I shutdown the subscriber postmaster and restart it switches to
> 2016-12-17 11:43:09.628 EST [2373] LOG: worker process: ???? (PID 2393)
> exited with exit code 1
>
> Not sure where the 'G' was coming from (other times I have seen an 'I'
> here or other random characters)
>
Uninitialized bgw_name for apply worker. Rather silly bug. Fixed.
>
> I don't think we are cleaning up subscriptions on a drop database
>
> If I do the following
>
> 1) Create a subscription in a new database
> 2) Stop the publisher
> 3) Drop the database on the subscriber
>
> test=# create subscription mysuba connection 'host=localhost dbname=test
> port=5440' publication mypub;
> test=# \c b
> b=# drop database test;
> DROP DATABASE
> b=# select * FROM pg_subscription ;
> subdbid | subname | subowner | subenabled | subconninfo |
> subslotname | subpublications
> ---------+---------+----------+------------+--------------------------------------+-------------+-----------------
>
> 16384 | mysuba | 10 | t | host=localhost dbname=test
> port=5440 | mysuba | {mypub}
>
Good one. I added check that prevents dropping database when there is
subscription defined for it. I think we can't cascade here as
subscription may or may not hold resources (slot) in another
instance/database so preventing the drop is best we can do.
>
> Also I don't think I can now drop mysuba
> b=# drop subscription mysuba;
> ERROR: subscription "mysuba" does not exist
>
Yeah subscriptions are per database.
I don't want to make v14 just for these 2 changes as that would make
life harder for anybody code-reviewing the v13 so attached is diff with
above fixes that applies on top of v13.
--
Petr Jelinek http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 8be9f39..2c58cc6 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -107,7 +107,41 @@ GetSubscription(Oid subid, bool missing_ok)
}
/*
- * Free memory allocated by subscription struct. */
+ * Return number of subscriptions defined in given database.
+ * Used by dropdb() to check if database can indeed be dropped.
+ */
+int
+CountDBSubscriptions(Oid dbid)
+{
+ int nsubs = 0;
+ Relation rel;
+ ScanKeyData scankey;
+ SysScanDesc scan;
+ HeapTuple tup;
+
+ rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
+
+ ScanKeyInit(&scankey,
+ Anum_pg_subscription_subdbid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(dbid));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, 1, &scankey);
+
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ nsubs++;
+
+ systable_endscan(scan);
+
+ heap_close(rel, NoLock);
+
+ return nsubs;
+}
+
+/*
+ * Free memory allocated by subscription struct.
+ */
void
FreeSubscription(Subscription *sub)
{
diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c
index 0919ad8..45d152c 100644
--- a/src/backend/commands/dbcommands.c
+++ b/src/backend/commands/dbcommands.c
@@ -37,6 +37,7 @@
#include "catalog/pg_authid.h"
#include "catalog/pg_database.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_subscription.h"
#include "catalog/pg_tablespace.h"
#include "commands/comment.h"
#include "commands/dbcommands.h"
@@ -790,6 +791,7 @@ dropdb(const char *dbname, bool missing_ok)
int npreparedxacts;
int nslots,
nslots_active;
+ int nsubscriptions;
/*
* Look up the target database's OID, and get exclusive lock on it. We
@@ -875,6 +877,21 @@ dropdb(const char *dbname, bool missing_ok)
errdetail_busy_db(notherbackends, npreparedxacts)));
/*
+ * Check if there are subscriptions defined in the target database.
+ *
+ * We can't drop them automatically because they might be holding
+ * resources in other databases/instances.
+ */
+ if ((nsubscriptions = CountDBSubscriptions(db_id)) > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("database \"%s\" is being used by logical replication subscription",
+ dbname),
+ errdetail_plural("There is %d subscription.",
+ "There are %d subscriptions.",
+ nsubscriptions, nsubscriptions)));
+
+ /*
* Remove the database's tuple from pg_database.
*/
tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index fe30dda..bd865ef 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -370,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt)
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
+ ApplyLauncherWakeupAtCommit();
+
return myself;
}
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 0f04cb3..783d97e 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -280,6 +280,8 @@ logicalrep_worker_launch(Oid dbid, Oid subid, Oid userid)
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
bgw.bgw_main = ApplyWorkerMain;
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication worker %u", subid);
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index f6a3bac..057b36e 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -77,4 +77,6 @@ extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
extern Oid get_subscription_oid(const char *subname, bool missing_ok);
+extern int CountDBSubscriptions(Oid dbid);
+
#endif /* PG_SUBSCRIPTION_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers