Hello,

At Sun, 23 Apr 2017 00:51:52 +0900, Masahiko Sawada <sawada.m...@gmail.com> 
wrote in <CAD21AoApBU+nz7FYaWX6gjyB9r6WWrTujbL4rrZiocHLc=p...@mail.gmail.com>
> On Fri, Apr 21, 2017 at 11:19 PM, Masahiko Sawada <sawada.m...@gmail.com> 
> wrote:
> > On Fri, Apr 21, 2017 at 5:33 PM, Kyotaro HORIGUCHI
> > <horiguchi.kyot...@lab.ntt.co.jp> wrote:
> >> Hello,
> >>
> >> At Thu, 20 Apr 2017 13:21:14 +0900, Masahiko Sawada 
> >> <sawada.m...@gmail.com> wrote in 
> >> <CAD21AoDrw0OaHE=ovrrhqx248kjj7w+1vim3k76ap46hnhj...@mail.gmail.com>
> >>> On Thu, Apr 20, 2017 at 12:30 AM, Petr Jelinek
> >>> <petr.jeli...@2ndquadrant.com> wrote:
> >>> > On 19/04/17 15:57, Masahiko Sawada wrote:
> >>> >> On Wed, Apr 19, 2017 at 10:07 PM, Petr Jelinek
> >>> >> <petr.jeli...@2ndquadrant.com> wrote:
> >>> >> Yeah, sorry, I meant that we don't want to wait but cannot launch the
> >>> >> tablesync worker in such case.
> >>> >>
> >>> >> But after more thought, the latest patch Peter proposed has the same
> >>> >> problem. Perhaps we need to scan always whole pg_subscription_rel and
> >>> >> remove the entry if the corresponding table get synced.
> >>> >>
> >>> >
> >>> > Yes that's what I mean by "Why can't we just update the hashtable based
> >>> > on the catalog". And if we do that then I don't understand why do we
> >>> > need both hastable and linked list if we need to update both based on
> >>> > catalog reads anyway.
> >>>
> >>> Thanks, I've now understood correctly. Yes, I think you're right. If
> >>> we update the hash table based on the catalog whenever table state is
> >>> invalidated, we don't need to have both hash table and list.
> >>
> >> Ah, ok. The patch from Peter still generating and replacing the
> >> content of the list. The attached patch stores everything into
> >> SubscriptionRelState. Oppositte to my anticiation, the hash can
> >> be efectively kept small and removed.
> >>
> >
> > Thank you for the patch!
> > Actually, I also bumped into the same the situation where we got the
> > following error during hash_seq_search. I guess we cannot commit a
> > transaction during hash_seq_scan but the sequential scan loop in
> > process_syncing_tables_for_apply could attempt to do that.

Ah. Thanks. I forgot that I saw the same error once. The hash has
nothing to do with any transactions. The scan now runs after
freezing of the hash.

Petr:
> I think we should document why this is done this way - I mean it's not
> very obvious that it's okay to update just when not found and why and
> even less obvious that it would be in fact wrong to update already
> existing entry.

It is not prudently designed. I changed there.. seemingly more
reasonable, maybe.

Petr:
> I also think that in it's current form the
> GetSubscriptionNotReadyRelations should be moved to tablesync.c 

Removed then moved to tablesync.c.

Petr:
> I also think we can't add the new fields to
> SubscriptionRelState directly as that's used by catalog access
> functions as well.

Yeah, it was my lazyness. A new struct SubscriptionWorkerState is
added in tablesync.c and the interval stuff runs on it.


At Sun, 23 Apr 2017 00:51:52 +0900, Masahiko Sawada <sawada.m...@gmail.com> 
wrote in <CAD21AoApBU+nz7FYaWX6gjyB9r6WWrTujbL4rrZiocHLc=p...@mail.gmail.com>
> So I guess we should commit the changing status to SUBREL_STATE_READY
> after finished hash_seq_scan.

We could so either, but I thought that a hash can explicitly
"unfreeze" without a side effect. The attached first one adds the
function in dynahash.c. I don't think that just allowing
unregsiterd scan on unfrozen hash is worse that this.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From dcd8ad43f3dceac4018bfb0819d37190a1fd8ef8 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 24 Apr 2017 16:06:10 +0900
Subject: [PATCH 1/2] Add unfrozen feature to dynahash

Scans on unfrozen hash cannot live beyond a transaction lifetime.  A
hash can be frozen to allow modify-free sequential scan which can live
ybeyond transactions but there's no means to release the status. This
patch adds a new function hash_unfreeze to allow us to reset frozen
hashes to insertable state.
---
 src/backend/utils/hash/dynahash.c | 20 ++++++++++++++++++++
 src/include/utils/hsearch.h       |  1 +
 2 files changed, 21 insertions(+)

diff --git a/src/backend/utils/hash/dynahash.c b/src/backend/utils/hash/dynahash.c
index 12b1658..073e2bc 100644
--- a/src/backend/utils/hash/dynahash.c
+++ b/src/backend/utils/hash/dynahash.c
@@ -1334,6 +1334,9 @@ hash_get_num_entries(HTAB *hashp)
  * wherein it is inconvenient to track whether a scan is still open, and
  * there's no possibility of further insertions after readout has begun.
  *
+ * NOTE: it is possible to unfreeze a frozen hash. All running sequential
+ * scans must be abandoned by the scanners' responsibility.
+ *
  * NOTE: to use this with a partitioned hashtable, caller had better hold
  * at least shared lock on all partitions of the table throughout the scan!
  * We can cope with insertions or deletions by our own backend, but *not*
@@ -1456,6 +1459,23 @@ hash_freeze(HTAB *hashp)
 	hashp->frozen = true;
 }
 
+/*
+ * hash_unfreeze
+ *			Reset the freeze state of a hashtable
+ *
+ * This re-enables insertion to a hash again. Active sequentual scans started
+ * during the freeze must not continue here after.
+ */
+void
+hash_unfreeze(HTAB *hashp)
+{
+	Assert (!hashp->isshared);
+	if (!hashp->frozen)
+		elog(ERROR, "this hash hashtable \"%s\" is not frozen",
+			 hashp->tabname);
+	hashp->frozen = false;
+}
+
 
 /********************************* UTILITIES ************************/
 
diff --git a/src/include/utils/hsearch.h b/src/include/utils/hsearch.h
index 7964087..60920d4 100644
--- a/src/include/utils/hsearch.h
+++ b/src/include/utils/hsearch.h
@@ -136,6 +136,7 @@ extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp);
 extern void *hash_seq_search(HASH_SEQ_STATUS *status);
 extern void hash_seq_term(HASH_SEQ_STATUS *status);
 extern void hash_freeze(HTAB *hashp);
+extern void hash_unfreeze(HTAB *hashp);
 extern Size hash_estimate_size(long num_entries, Size entrysize);
 extern long hash_select_dirsize(long num_entries);
 extern Size hash_get_shared_size(HASHCTL *info, int flags);
-- 
2.9.2

>From b1c86c37d442f9a90330df2f47a0680fa478ccde Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 24 Apr 2017 13:39:17 +0900
Subject: [PATCH 2/2] Wait between tablesync worker restarts

Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval.  This avoids restarting failing workers in
a tight loop.
---
 src/backend/catalog/pg_subscription.c       |  52 -------
 src/backend/replication/logical/tablesync.c | 205 +++++++++++++++++++++++-----
 src/include/catalog/pg_subscription_rel.h   |   1 -
 3 files changed, 172 insertions(+), 86 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a183850..ff1ed62 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -446,55 +446,3 @@ GetSubscriptionRelations(Oid subid)
 
 	return res;
 }
-
-/*
- * Get all relations for subscription that are not in a ready state.
- *
- * Returned list is palloced in current memory context.
- */
-List *
-GetSubscriptionNotReadyRelations(Oid subid)
-{
-	List	   *res = NIL;
-	Relation	rel;
-	HeapTuple	tup;
-	int			nkeys = 0;
-	ScanKeyData	skey[2];
-	SysScanDesc	scan;
-
-	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
-
-	ScanKeyInit(&skey[nkeys++],
-				Anum_pg_subscription_rel_srsubid,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(subid));
-
-	ScanKeyInit(&skey[nkeys++],
-				Anum_pg_subscription_rel_srsubstate,
-				BTEqualStrategyNumber, F_CHARNE,
-				CharGetDatum(SUBREL_STATE_READY));
-
-	scan = systable_beginscan(rel, InvalidOid, false,
-							  NULL, nkeys, skey);
-
-	while (HeapTupleIsValid(tup = systable_getnext(scan)))
-	{
-		Form_pg_subscription_rel	subrel;
-		SubscriptionRelState	   *relstate;
-
-		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
-
-		relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
-
-		res = lappend(res, relstate);
-	}
-
-	/* Cleanup */
-	systable_endscan(scan);
-	heap_close(rel, AccessShareLock);
-
-	return res;
-}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b4b48d9..e10b258 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -103,9 +103,18 @@
 #include "storage/ipc.h"
 
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 
+/* Struct to track subscription worker startup */
+typedef struct SubscriptionWorkerState
+{
+	SubscriptionRelState rs;		/* key is rs.relid */
+	TimestampTz last_worker_start;	/* time of the last launch of worker */
+	bool		alive;				/* this relid is alive in the catalog */
+} SubscriptionWorkerState;
+
 static bool table_states_valid = false;
 
 StringInfo	copybuf = NULL;
@@ -237,6 +246,80 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 }
 
 /*
+ * Get all relations for subscription that are not in a ready state.
+ *
+ * Updates the given hash and returns the number of live entries.
+ *
+ * Live entries are the hash entries that a corresponding tuple is found in
+ * pg_subscription_rel.
+ *
+ */
+static int
+GetSubscriptionNotReadyRelations(struct HTAB *localstate, Oid subid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	int			nkeys = 0;
+	ScanKeyData	skey[2];
+	SysScanDesc	scan;
+	int			ret = 0;
+
+	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubstate,
+				BTEqualStrategyNumber, F_CHARNE,
+				CharGetDatum(SUBREL_STATE_READY));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, nkeys, skey);
+
+	/* Update entries in the given hash */
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel	subrel;
+		SubscriptionWorkerState	   *wstate;
+		bool						found = false;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		wstate  = hash_search(localstate, (void *) &subrel->srrelid,
+							  HASH_ENTER, &found);
+
+		/*
+		 * rs.relid is the key of this entry, which won't be changed. The rest
+		 * fields should be initialized only at the first time.
+		 */
+		if (!found)
+		{
+			wstate->rs.relid = subrel->srrelid;
+			wstate->last_worker_start = 0;
+			wstate->alive = false;
+		}
+
+		/* These two fields should track the status in catalog */
+		wstate->rs.lsn = subrel->srsublsn;
+		wstate->rs.state = subrel->srsubstate;
+
+		/* Set as alive since found in the catalog. */
+		Assert(!wstate->alive);
+		wstate->alive = true;
+		ret++;
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	return ret;
+}
+
+/*
  * Handle table synchronization cooperation from the apply worker.
  *
  * Walk over all subscription tables that are individually tracked by the
@@ -245,7 +328,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If there are tables that need synchronizing and are not being synchronized
  * yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers).  To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info.  We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
  *
  * For tables that are being synchronized already, check if sync workers
  * either need action from the apply worker or have finished.
@@ -263,47 +349,87 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
-	static List *table_states = NIL;
-	ListCell   *lc;
+	static HTAB				   *subrel_local_state = NULL;
+	HASH_SEQ_STATUS				hash_seq;
+	SubscriptionWorkerState	   *wstate;
+	bool						local_state_updated = false;
+	int							nrels = 0;
 
 	Assert(!IsTransactionState());
 
 	/* We need up to date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
-		MemoryContext	oldctx;
-		List		   *rstates;
-		ListCell	   *lc;
-		SubscriptionRelState *rstate;
-
-		/* Clean the old list. */
-		list_free_deep(table_states);
-		table_states = NIL;
+		/* Create the local state hash if not exists */
+		if (!subrel_local_state)
+		{
+			HASHCTL		ctl;
+
+			memset(&ctl, 0, sizeof(ctl));
+			ctl.keysize = sizeof(Oid);
+			ctl.entrysize = sizeof(SubscriptionRelState);
+			subrel_local_state =
+				hash_create("Logical replication table sync worker state",
+							256, &ctl, HASH_ELEM | HASH_BLOBS);
+		}
 
 		StartTransactionCommand();
 
-		/* Fetch all non-ready tables. */
-		rstates	= GetSubscriptionNotReadyRelations(MySubscription->oid);
+		/* Update local state hash with non-ready tables. */
+		nrels = GetSubscriptionNotReadyRelations(subrel_local_state,
+												 MySubscription->oid);
+		local_state_updated = true;
+		CommitTransactionCommand();
 
-		/* Allocate the tracking info in a permanent memory context. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		foreach(lc, rstates)
+		table_states_valid = true;
+	}
+	/* Just count the entries if the local_state is not update this time */
+	else if (subrel_local_state != NULL)
+		nrels = hash_get_num_entries(subrel_local_state);
+
+	/*  No relation to be synched */
+	if (nrels == 0)
+	{
+		if (subrel_local_state != NULL)
 		{
-			rstate = palloc(sizeof(SubscriptionRelState));
-			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-			table_states = lappend(table_states, rstate);
+			hash_destroy(subrel_local_state);
+			subrel_local_state = NULL;
 		}
-		MemoryContextSwitchTo(oldctx);
 
-		CommitTransactionCommand();
-
-		table_states_valid = true;
+		return;
 	}
 
-	/* Process all tables that are being synchronized. */
-	foreach(lc, table_states)
+	/*
+	 * Freeze hash table. This guarantees that the hash won't be broken for
+	 * the scan below. Unfreezing on error leavs the hash freezed but any
+	 * errors within this loop blows away the worker process involving the
+	 * hash.
+	 */
+	hash_freeze(subrel_local_state);
+
+	/* Process all tables that are to be synchronized. */
+	hash_seq_init(&hash_seq, subrel_local_state);
+
+	while ((wstate = hash_seq_search(&hash_seq)) != NULL)
 	{
-		SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
+		SubscriptionRelState	*rstate;
+
+		/*
+		 * Remove entries no longer necessary. The flag signals nothing if
+		 * subrel_local_state is not updated above. We can remove entries in
+		 * frozen hash safely.
+		 */
+		if (local_state_updated && !wstate->alive)
+		{
+			hash_search(subrel_local_state, &wstate->rs.relid,
+						HASH_REMOVE, NULL);
+			continue;
+		}
+
+		rstate = &wstate->rs;
+
+		/* This will be true in the next rurn only for live entries */
+		wstate->alive = false;
 
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
 		{
@@ -344,7 +470,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * workers for this subscription, while we have the lock, for
 				 * later.
 				 */
-				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				nsyncworkers =
+					logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
 			LWLockRelease(LogicalRepWorkerLock);
 
 			/*
@@ -401,16 +528,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 * there is some free sync worker slot, start new sync worker
 			 * for the table.
 			 */
-			else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
+			else if (!syncworker &&
+					 nsyncworkers < max_sync_workers_per_subscription)
 			{
-				logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-										 MySubscription->oid,
-										 MySubscription->name,
-										 MyLogicalRepWorker->userid,
-										 rstate->relid);
+				TimestampTz	now = GetCurrentTimestamp();
+
+				/* Keep moderate intervals from the previous launch */
+				if (TimestampDifferenceExceeds(wstate->last_worker_start, now,
+											   wal_retrieve_retry_interval))
+				{
+					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+											 MySubscription->oid,
+											 MySubscription->name,
+											 MyLogicalRepWorker->userid,
+											 rstate->relid);
+					wstate->last_worker_start = now;
+				}
 			}
 		}
 	}
+
+	/* sequential scan ended. Allow insertions again. */
+	hash_unfreeze(subrel_local_state);
 }
 
 /*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f4f152..8f7337c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -75,6 +75,5 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern List *GetSubscriptionRelations(Oid subid);
-extern List *GetSubscriptionNotReadyRelations(Oid subid);
 
 #endif   /* PG_SUBSCRIPTION_REL_H */
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to