Hello, At Sun, 23 Apr 2017 00:51:52 +0900, Masahiko Sawada <sawada.m...@gmail.com> wrote in <CAD21AoApBU+nz7FYaWX6gjyB9r6WWrTujbL4rrZiocHLc=p...@mail.gmail.com> > On Fri, Apr 21, 2017 at 11:19 PM, Masahiko Sawada <sawada.m...@gmail.com> > wrote: > > On Fri, Apr 21, 2017 at 5:33 PM, Kyotaro HORIGUCHI > > <horiguchi.kyot...@lab.ntt.co.jp> wrote: > >> Hello, > >> > >> At Thu, 20 Apr 2017 13:21:14 +0900, Masahiko Sawada > >> <sawada.m...@gmail.com> wrote in > >> <CAD21AoDrw0OaHE=ovrrhqx248kjj7w+1vim3k76ap46hnhj...@mail.gmail.com> > >>> On Thu, Apr 20, 2017 at 12:30 AM, Petr Jelinek > >>> <petr.jeli...@2ndquadrant.com> wrote: > >>> > On 19/04/17 15:57, Masahiko Sawada wrote: > >>> >> On Wed, Apr 19, 2017 at 10:07 PM, Petr Jelinek > >>> >> <petr.jeli...@2ndquadrant.com> wrote: > >>> >> Yeah, sorry, I meant that we don't want to wait but cannot launch the > >>> >> tablesync worker in such case. > >>> >> > >>> >> But after more thought, the latest patch Peter proposed has the same > >>> >> problem. Perhaps we need to scan always whole pg_subscription_rel and > >>> >> remove the entry if the corresponding table get synced. > >>> >> > >>> > > >>> > Yes that's what I mean by "Why can't we just update the hashtable based > >>> > on the catalog". And if we do that then I don't understand why do we > >>> > need both hastable and linked list if we need to update both based on > >>> > catalog reads anyway. > >>> > >>> Thanks, I've now understood correctly. Yes, I think you're right. If > >>> we update the hash table based on the catalog whenever table state is > >>> invalidated, we don't need to have both hash table and list. > >> > >> Ah, ok. The patch from Peter still generating and replacing the > >> content of the list. The attached patch stores everything into > >> SubscriptionRelState. Oppositte to my anticiation, the hash can > >> be efectively kept small and removed. > >> > > > > Thank you for the patch! > > Actually, I also bumped into the same the situation where we got the > > following error during hash_seq_search. I guess we cannot commit a > > transaction during hash_seq_scan but the sequential scan loop in > > process_syncing_tables_for_apply could attempt to do that.
Ah. Thanks. I forgot that I saw the same error once. The hash has nothing to do with any transactions. The scan now runs after freezing of the hash. Petr: > I think we should document why this is done this way - I mean it's not > very obvious that it's okay to update just when not found and why and > even less obvious that it would be in fact wrong to update already > existing entry. It is not prudently designed. I changed there.. seemingly more reasonable, maybe. Petr: > I also think that in it's current form the > GetSubscriptionNotReadyRelations should be moved to tablesync.c Removed then moved to tablesync.c. Petr: > I also think we can't add the new fields to > SubscriptionRelState directly as that's used by catalog access > functions as well. Yeah, it was my lazyness. A new struct SubscriptionWorkerState is added in tablesync.c and the interval stuff runs on it. At Sun, 23 Apr 2017 00:51:52 +0900, Masahiko Sawada <sawada.m...@gmail.com> wrote in <CAD21AoApBU+nz7FYaWX6gjyB9r6WWrTujbL4rrZiocHLc=p...@mail.gmail.com> > So I guess we should commit the changing status to SUBREL_STATE_READY > after finished hash_seq_scan. We could so either, but I thought that a hash can explicitly "unfreeze" without a side effect. The attached first one adds the function in dynahash.c. I don't think that just allowing unregsiterd scan on unfrozen hash is worse that this. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From dcd8ad43f3dceac4018bfb0819d37190a1fd8ef8 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Mon, 24 Apr 2017 16:06:10 +0900 Subject: [PATCH 1/2] Add unfrozen feature to dynahash Scans on unfrozen hash cannot live beyond a transaction lifetime. A hash can be frozen to allow modify-free sequential scan which can live ybeyond transactions but there's no means to release the status. This patch adds a new function hash_unfreeze to allow us to reset frozen hashes to insertable state. --- src/backend/utils/hash/dynahash.c | 20 ++++++++++++++++++++ src/include/utils/hsearch.h | 1 + 2 files changed, 21 insertions(+) diff --git a/src/backend/utils/hash/dynahash.c b/src/backend/utils/hash/dynahash.c index 12b1658..073e2bc 100644 --- a/src/backend/utils/hash/dynahash.c +++ b/src/backend/utils/hash/dynahash.c @@ -1334,6 +1334,9 @@ hash_get_num_entries(HTAB *hashp) * wherein it is inconvenient to track whether a scan is still open, and * there's no possibility of further insertions after readout has begun. * + * NOTE: it is possible to unfreeze a frozen hash. All running sequential + * scans must be abandoned by the scanners' responsibility. + * * NOTE: to use this with a partitioned hashtable, caller had better hold * at least shared lock on all partitions of the table throughout the scan! * We can cope with insertions or deletions by our own backend, but *not* @@ -1456,6 +1459,23 @@ hash_freeze(HTAB *hashp) hashp->frozen = true; } +/* + * hash_unfreeze + * Reset the freeze state of a hashtable + * + * This re-enables insertion to a hash again. Active sequentual scans started + * during the freeze must not continue here after. + */ +void +hash_unfreeze(HTAB *hashp) +{ + Assert (!hashp->isshared); + if (!hashp->frozen) + elog(ERROR, "this hash hashtable \"%s\" is not frozen", + hashp->tabname); + hashp->frozen = false; +} + /********************************* UTILITIES ************************/ diff --git a/src/include/utils/hsearch.h b/src/include/utils/hsearch.h index 7964087..60920d4 100644 --- a/src/include/utils/hsearch.h +++ b/src/include/utils/hsearch.h @@ -136,6 +136,7 @@ extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp); extern void *hash_seq_search(HASH_SEQ_STATUS *status); extern void hash_seq_term(HASH_SEQ_STATUS *status); extern void hash_freeze(HTAB *hashp); +extern void hash_unfreeze(HTAB *hashp); extern Size hash_estimate_size(long num_entries, Size entrysize); extern long hash_select_dirsize(long num_entries); extern Size hash_get_shared_size(HASHCTL *info, int flags); -- 2.9.2
>From b1c86c37d442f9a90330df2f47a0680fa478ccde Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Mon, 24 Apr 2017 13:39:17 +0900 Subject: [PATCH 2/2] Wait between tablesync worker restarts Before restarting a tablesync worker for the same relation, wait wal_retrieve_retry_interval. This avoids restarting failing workers in a tight loop. --- src/backend/catalog/pg_subscription.c | 52 ------- src/backend/replication/logical/tablesync.c | 205 +++++++++++++++++++++++----- src/include/catalog/pg_subscription_rel.h | 1 - 3 files changed, 172 insertions(+), 86 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a183850..ff1ed62 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -446,55 +446,3 @@ GetSubscriptionRelations(Oid subid) return res; } - -/* - * Get all relations for subscription that are not in a ready state. - * - * Returned list is palloced in current memory context. - */ -List * -GetSubscriptionNotReadyRelations(Oid subid) -{ - List *res = NIL; - Relation rel; - HeapTuple tup; - int nkeys = 0; - ScanKeyData skey[2]; - SysScanDesc scan; - - rel = heap_open(SubscriptionRelRelationId, AccessShareLock); - - ScanKeyInit(&skey[nkeys++], - Anum_pg_subscription_rel_srsubid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(subid)); - - ScanKeyInit(&skey[nkeys++], - Anum_pg_subscription_rel_srsubstate, - BTEqualStrategyNumber, F_CHARNE, - CharGetDatum(SUBREL_STATE_READY)); - - scan = systable_beginscan(rel, InvalidOid, false, - NULL, nkeys, skey); - - while (HeapTupleIsValid(tup = systable_getnext(scan))) - { - Form_pg_subscription_rel subrel; - SubscriptionRelState *relstate; - - subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); - - relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState)); - relstate->relid = subrel->srrelid; - relstate->state = subrel->srsubstate; - relstate->lsn = subrel->srsublsn; - - res = lappend(res, relstate); - } - - /* Cleanup */ - systable_endscan(scan); - heap_close(rel, AccessShareLock); - - return res; -} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index b4b48d9..e10b258 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -103,9 +103,18 @@ #include "storage/ipc.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +/* Struct to track subscription worker startup */ +typedef struct SubscriptionWorkerState +{ + SubscriptionRelState rs; /* key is rs.relid */ + TimestampTz last_worker_start; /* time of the last launch of worker */ + bool alive; /* this relid is alive in the catalog */ +} SubscriptionWorkerState; + static bool table_states_valid = false; StringInfo copybuf = NULL; @@ -237,6 +246,80 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) } /* + * Get all relations for subscription that are not in a ready state. + * + * Updates the given hash and returns the number of live entries. + * + * Live entries are the hash entries that a corresponding tuple is found in + * pg_subscription_rel. + * + */ +static int +GetSubscriptionNotReadyRelations(struct HTAB *localstate, Oid subid) +{ + Relation rel; + HeapTuple tup; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + int ret = 0; + + rel = heap_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHARNE, + CharGetDatum(SUBREL_STATE_READY)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + /* Update entries in the given hash */ + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + SubscriptionWorkerState *wstate; + bool found = false; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + wstate = hash_search(localstate, (void *) &subrel->srrelid, + HASH_ENTER, &found); + + /* + * rs.relid is the key of this entry, which won't be changed. The rest + * fields should be initialized only at the first time. + */ + if (!found) + { + wstate->rs.relid = subrel->srrelid; + wstate->last_worker_start = 0; + wstate->alive = false; + } + + /* These two fields should track the status in catalog */ + wstate->rs.lsn = subrel->srsublsn; + wstate->rs.state = subrel->srsubstate; + + /* Set as alive since found in the catalog. */ + Assert(!wstate->alive); + wstate->alive = true; + ret++; + } + + /* Cleanup */ + systable_endscan(scan); + heap_close(rel, AccessShareLock); + + return ret; +} + +/* * Handle table synchronization cooperation from the apply worker. * * Walk over all subscription tables that are individually tracked by the @@ -245,7 +328,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) * * If there are tables that need synchronizing and are not being synchronized * yet, start sync workers for them (if there are free slots for sync - * workers). + * workers). To prevent starting the sync worker for the same relation at a + * high frequency after a failure, we store its last start time with each sync + * state info. We start the sync worker for the same relation after waiting + * at least wal_retrieve_retry_interval. * * For tables that are being synchronized already, check if sync workers * either need action from the apply worker or have finished. @@ -263,47 +349,87 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) static void process_syncing_tables_for_apply(XLogRecPtr current_lsn) { - static List *table_states = NIL; - ListCell *lc; + static HTAB *subrel_local_state = NULL; + HASH_SEQ_STATUS hash_seq; + SubscriptionWorkerState *wstate; + bool local_state_updated = false; + int nrels = 0; Assert(!IsTransactionState()); /* We need up to date sync state info for subscription tables here. */ if (!table_states_valid) { - MemoryContext oldctx; - List *rstates; - ListCell *lc; - SubscriptionRelState *rstate; - - /* Clean the old list. */ - list_free_deep(table_states); - table_states = NIL; + /* Create the local state hash if not exists */ + if (!subrel_local_state) + { + HASHCTL ctl; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(SubscriptionRelState); + subrel_local_state = + hash_create("Logical replication table sync worker state", + 256, &ctl, HASH_ELEM | HASH_BLOBS); + } StartTransactionCommand(); - /* Fetch all non-ready tables. */ - rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + /* Update local state hash with non-ready tables. */ + nrels = GetSubscriptionNotReadyRelations(subrel_local_state, + MySubscription->oid); + local_state_updated = true; + CommitTransactionCommand(); - /* Allocate the tracking info in a permanent memory context. */ - oldctx = MemoryContextSwitchTo(CacheMemoryContext); - foreach(lc, rstates) + table_states_valid = true; + } + /* Just count the entries if the local_state is not update this time */ + else if (subrel_local_state != NULL) + nrels = hash_get_num_entries(subrel_local_state); + + /* No relation to be synched */ + if (nrels == 0) + { + if (subrel_local_state != NULL) { - rstate = palloc(sizeof(SubscriptionRelState)); - memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); - table_states = lappend(table_states, rstate); + hash_destroy(subrel_local_state); + subrel_local_state = NULL; } - MemoryContextSwitchTo(oldctx); - CommitTransactionCommand(); - - table_states_valid = true; + return; } - /* Process all tables that are being synchronized. */ - foreach(lc, table_states) + /* + * Freeze hash table. This guarantees that the hash won't be broken for + * the scan below. Unfreezing on error leavs the hash freezed but any + * errors within this loop blows away the worker process involving the + * hash. + */ + hash_freeze(subrel_local_state); + + /* Process all tables that are to be synchronized. */ + hash_seq_init(&hash_seq, subrel_local_state); + + while ((wstate = hash_seq_search(&hash_seq)) != NULL) { - SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); + SubscriptionRelState *rstate; + + /* + * Remove entries no longer necessary. The flag signals nothing if + * subrel_local_state is not updated above. We can remove entries in + * frozen hash safely. + */ + if (local_state_updated && !wstate->alive) + { + hash_search(subrel_local_state, &wstate->rs.relid, + HASH_REMOVE, NULL); + continue; + } + + rstate = &wstate->rs; + + /* This will be true in the next rurn only for live entries */ + wstate->alive = false; if (rstate->state == SUBREL_STATE_SYNCDONE) { @@ -344,7 +470,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * workers for this subscription, while we have the lock, for * later. */ - nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + nsyncworkers = + logicalrep_sync_worker_count(MyLogicalRepWorker->subid); LWLockRelease(LogicalRepWorkerLock); /* @@ -401,16 +528,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * there is some free sync worker slot, start new sync worker * for the table. */ - else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription) + else if (!syncworker && + nsyncworkers < max_sync_workers_per_subscription) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, - MySubscription->oid, - MySubscription->name, - MyLogicalRepWorker->userid, - rstate->relid); + TimestampTz now = GetCurrentTimestamp(); + + /* Keep moderate intervals from the previous launch */ + if (TimestampDifferenceExceeds(wstate->last_worker_start, now, + wal_retrieve_retry_interval)) + { + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + wstate->last_worker_start = now; + } } } } + + /* sequential scan ended. Allow insertions again. */ + hash_unfreeze(subrel_local_state); } /* diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 9f4f152..8f7337c 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -75,6 +75,5 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid, extern void RemoveSubscriptionRel(Oid subid, Oid relid); extern List *GetSubscriptionRelations(Oid subid); -extern List *GetSubscriptionNotReadyRelations(Oid subid); #endif /* PG_SUBSCRIPTION_REL_H */ -- 2.9.2
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers