Hello,

At Tue, 25 Apr 2017 00:52:09 +0900, Masahiko Sawada <sawada.m...@gmail.com> 
wrote in <cad21aochpfbqkrifsmmzsxm5qbb5rd_hzmrnil4e19t2r8w...@mail.gmail.com>
> + elog(ERROR, "this hash hashtable \"%s\" is not frozen",
> + hashp->tabname);
> 
> Maybe the error message should be "this hashtable \"%s\" is not frozen".

Both of "hashtable" and "hash table" appear there but hash_freeze
uses the former. I followed that.

> s/leavs/leaves/
> s/freezed/frozen/
> s/rurn/run/

Thanks! But the "rurn" was a typo of "turn".


> On Tue, Apr 25, 2017 at 1:42 AM, Petr Jelinek
> <petr.jeli...@2ndquadrant.com> wrote:
> > On 24/04/17 17:52, Masahiko Sawada wrote:
> >> On Mon, Apr 24, 2017 at 4:41 PM, Kyotaro HORIGUCHI
> >> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
> >> + /*
> >> + * Remove entries no longer necessary. The flag signals nothing if
> >> + * subrel_local_state is not updated above. We can remove entries in
> >> + * frozen hash safely.
> >> + */
> >> + if (local_state_updated && !wstate->alive)
> >> + {
> >> + hash_search(subrel_local_state, &wstate->rs.relid,
> >> + HASH_REMOVE, NULL);
> >> + continue;
> >> + }
> >>
> >> IIUC since the apply worker can change the status from
> >> SUBREL_STATE_SYNCWAIT to SUBREL_STATE_READY in a hash_seq_search loop
> >> the table sync worker which is changed to SUBREL_STATE_READY by the
> >> apply worker before updating the subrel_local_state could be remained
> >> in the hash table.

Every time after pg_subscription_rel is updated, the hash entries
are marked alive only when the corresponding not-ready relations
found in pg_subscription_rel. If any live entries remains, nrels
becomes a positive number and dead entries are removed in the
loop just after. If no entry survives, the hash will be
immediately destroyed. Some dead entries can survive under
ceratin condition but the one of the aboves will occur shortly.

If it is hard to understand, I might should put some additional
comments.

> >>                    I think that we should scan pg_subscription_rel by
> >> using only a condition "subid".
> >>
> >
> > I don't follow this.
> >
> 
> Hmm, I'd misunderstood something. It should work fine. Sorry for the noise.

Anyway, the typo-fixed version is attached.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From c79c2c47a325e7b03e217cfed9cd4ac50ec22423 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 24 Apr 2017 16:06:10 +0900
Subject: [PATCH 1/2] Add unfrozen feature to dynahash

Scans on unfrozen hash cannot live beyond a transaction lifetime.  A
hash can be frozen to allow modify-free sequential scan which can live
ybeyond transactions but there's no means to release the status. This
patch adds a new function hash_unfreeze to allow us to reset frozen
hashes to insertable state.
---
 src/backend/utils/hash/dynahash.c | 20 ++++++++++++++++++++
 src/include/utils/hsearch.h       |  1 +
 2 files changed, 21 insertions(+)

diff --git a/src/backend/utils/hash/dynahash.c b/src/backend/utils/hash/dynahash.c
index 12b1658..e253ab2 100644
--- a/src/backend/utils/hash/dynahash.c
+++ b/src/backend/utils/hash/dynahash.c
@@ -1334,6 +1334,9 @@ hash_get_num_entries(HTAB *hashp)
  * wherein it is inconvenient to track whether a scan is still open, and
  * there's no possibility of further insertions after readout has begun.
  *
+ * NOTE: it is possible to unfreeze a frozen hash. All running sequential
+ * scans must be abandoned by the scanners' responsibility.
+ *
  * NOTE: to use this with a partitioned hashtable, caller had better hold
  * at least shared lock on all partitions of the table throughout the scan!
  * We can cope with insertions or deletions by our own backend, but *not*
@@ -1456,6 +1459,23 @@ hash_freeze(HTAB *hashp)
 	hashp->frozen = true;
 }
 
+/*
+ * hash_unfreeze
+ *			Reset the freeze state of a hashtable
+ *
+ * This re-enables insertion to a hash again. Active sequentual scans started
+ * during the freeze must not continue here after.
+ */
+void
+hash_unfreeze(HTAB *hashp)
+{
+	Assert (!hashp->isshared);
+	if (!hashp->frozen)
+		elog(ERROR, "this hashtable \"%s\" is not frozen",
+			 hashp->tabname);
+	hashp->frozen = false;
+}
+
 
 /********************************* UTILITIES ************************/
 
diff --git a/src/include/utils/hsearch.h b/src/include/utils/hsearch.h
index 7964087..60920d4 100644
--- a/src/include/utils/hsearch.h
+++ b/src/include/utils/hsearch.h
@@ -136,6 +136,7 @@ extern void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp);
 extern void *hash_seq_search(HASH_SEQ_STATUS *status);
 extern void hash_seq_term(HASH_SEQ_STATUS *status);
 extern void hash_freeze(HTAB *hashp);
+extern void hash_unfreeze(HTAB *hashp);
 extern Size hash_estimate_size(long num_entries, Size entrysize);
 extern long hash_select_dirsize(long num_entries);
 extern Size hash_get_shared_size(HASHCTL *info, int flags);
-- 
2.9.2

>From a422d7b936eb50c885e5480bc21c7bae636a559a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Mon, 24 Apr 2017 13:39:17 +0900
Subject: [PATCH 2/2] Wait between tablesync worker restarts

Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval.  This avoids restarting failing workers in
a tight loop.
---
 src/backend/catalog/pg_subscription.c       |  52 -------
 src/backend/replication/logical/tablesync.c | 205 +++++++++++++++++++++++-----
 src/include/catalog/pg_subscription_rel.h   |   1 -
 3 files changed, 172 insertions(+), 86 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a183850..ff1ed62 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -446,55 +446,3 @@ GetSubscriptionRelations(Oid subid)
 
 	return res;
 }
-
-/*
- * Get all relations for subscription that are not in a ready state.
- *
- * Returned list is palloced in current memory context.
- */
-List *
-GetSubscriptionNotReadyRelations(Oid subid)
-{
-	List	   *res = NIL;
-	Relation	rel;
-	HeapTuple	tup;
-	int			nkeys = 0;
-	ScanKeyData	skey[2];
-	SysScanDesc	scan;
-
-	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
-
-	ScanKeyInit(&skey[nkeys++],
-				Anum_pg_subscription_rel_srsubid,
-				BTEqualStrategyNumber, F_OIDEQ,
-				ObjectIdGetDatum(subid));
-
-	ScanKeyInit(&skey[nkeys++],
-				Anum_pg_subscription_rel_srsubstate,
-				BTEqualStrategyNumber, F_CHARNE,
-				CharGetDatum(SUBREL_STATE_READY));
-
-	scan = systable_beginscan(rel, InvalidOid, false,
-							  NULL, nkeys, skey);
-
-	while (HeapTupleIsValid(tup = systable_getnext(scan)))
-	{
-		Form_pg_subscription_rel	subrel;
-		SubscriptionRelState	   *relstate;
-
-		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
-
-		relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
-
-		res = lappend(res, relstate);
-	}
-
-	/* Cleanup */
-	systable_endscan(scan);
-	heap_close(rel, AccessShareLock);
-
-	return res;
-}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index b4b48d9..f1c2d71 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -103,9 +103,18 @@
 #include "storage/ipc.h"
 
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 
+/* Struct to track subscription worker startup */
+typedef struct SubscriptionWorkerState
+{
+	SubscriptionRelState rs;		/* key is rs.relid */
+	TimestampTz last_worker_start;	/* time of the last launch of worker */
+	bool		alive;				/* this relid is alive in the catalog */
+} SubscriptionWorkerState;
+
 static bool table_states_valid = false;
 
 StringInfo	copybuf = NULL;
@@ -237,6 +246,80 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 }
 
 /*
+ * Get all relations for subscription that are not in a ready state.
+ *
+ * Updates the given hash and returns the number of live entries.
+ *
+ * Live entries are the hash entries that a corresponding tuple is found in
+ * pg_subscription_rel.
+ *
+ */
+static int
+GetSubscriptionNotReadyRelations(struct HTAB *localstate, Oid subid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	int			nkeys = 0;
+	ScanKeyData	skey[2];
+	SysScanDesc	scan;
+	int			ret = 0;
+
+	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(subid));
+
+	ScanKeyInit(&skey[nkeys++],
+				Anum_pg_subscription_rel_srsubstate,
+				BTEqualStrategyNumber, F_CHARNE,
+				CharGetDatum(SUBREL_STATE_READY));
+
+	scan = systable_beginscan(rel, InvalidOid, false,
+							  NULL, nkeys, skey);
+
+	/* Update entries in the given hash */
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_subscription_rel	subrel;
+		SubscriptionWorkerState	   *wstate;
+		bool						found = false;
+
+		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+		wstate  = hash_search(localstate, (void *) &subrel->srrelid,
+							  HASH_ENTER, &found);
+
+		/*
+		 * rs.relid is the key of this entry, which won't be changed. The rest
+		 * fields should be initialized only at the first time.
+		 */
+		if (!found)
+		{
+			wstate->rs.relid = subrel->srrelid;
+			wstate->last_worker_start = 0;
+			wstate->alive = false;
+		}
+
+		/* These two fields should track the status in catalog */
+		wstate->rs.lsn = subrel->srsublsn;
+		wstate->rs.state = subrel->srsubstate;
+
+		/* Set as alive since found in the catalog. */
+		Assert(!wstate->alive);
+		wstate->alive = true;
+		ret++;
+	}
+
+	/* Cleanup */
+	systable_endscan(scan);
+	heap_close(rel, AccessShareLock);
+
+	return ret;
+}
+
+/*
  * Handle table synchronization cooperation from the apply worker.
  *
  * Walk over all subscription tables that are individually tracked by the
@@ -245,7 +328,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If there are tables that need synchronizing and are not being synchronized
  * yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers).  To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info.  We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
  *
  * For tables that are being synchronized already, check if sync workers
  * either need action from the apply worker or have finished.
@@ -263,47 +349,87 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
-	static List *table_states = NIL;
-	ListCell   *lc;
+	static HTAB				   *subrel_local_state = NULL;
+	HASH_SEQ_STATUS				hash_seq;
+	SubscriptionWorkerState	   *wstate;
+	bool						local_state_updated = false;
+	int							nrels = 0;
 
 	Assert(!IsTransactionState());
 
 	/* We need up to date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
-		MemoryContext	oldctx;
-		List		   *rstates;
-		ListCell	   *lc;
-		SubscriptionRelState *rstate;
-
-		/* Clean the old list. */
-		list_free_deep(table_states);
-		table_states = NIL;
+		/* Create the local state hash if not exists */
+		if (!subrel_local_state)
+		{
+			HASHCTL		ctl;
+
+			memset(&ctl, 0, sizeof(ctl));
+			ctl.keysize = sizeof(Oid);
+			ctl.entrysize = sizeof(SubscriptionRelState);
+			subrel_local_state =
+				hash_create("Logical replication table sync worker state",
+							256, &ctl, HASH_ELEM | HASH_BLOBS);
+		}
 
 		StartTransactionCommand();
 
-		/* Fetch all non-ready tables. */
-		rstates	= GetSubscriptionNotReadyRelations(MySubscription->oid);
+		/* Update local state hash with non-ready tables. */
+		nrels = GetSubscriptionNotReadyRelations(subrel_local_state,
+												 MySubscription->oid);
+		local_state_updated = true;
+		CommitTransactionCommand();
 
-		/* Allocate the tracking info in a permanent memory context. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		foreach(lc, rstates)
+		table_states_valid = true;
+	}
+	/* Just count the entries if the local_state is not update this time */
+	else if (subrel_local_state != NULL)
+		nrels = hash_get_num_entries(subrel_local_state);
+
+	/*  No relation to be synched */
+	if (nrels == 0)
+	{
+		if (subrel_local_state != NULL)
 		{
-			rstate = palloc(sizeof(SubscriptionRelState));
-			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-			table_states = lappend(table_states, rstate);
+			hash_destroy(subrel_local_state);
+			subrel_local_state = NULL;
 		}
-		MemoryContextSwitchTo(oldctx);
 
-		CommitTransactionCommand();
-
-		table_states_valid = true;
+		return;
 	}
 
-	/* Process all tables that are being synchronized. */
-	foreach(lc, table_states)
+	/*
+	 * Freeze hash table. This guarantees that the hash won't be broken for
+	 * the scan below. Unfreezing on error leaves the hash frozen but any
+	 * errors within this loop blows away the worker process involving the
+	 * hash.
+	 */
+	hash_freeze(subrel_local_state);
+
+	/* Process all tables that are to be synchronized. */
+	hash_seq_init(&hash_seq, subrel_local_state);
+
+	while ((wstate = hash_seq_search(&hash_seq)) != NULL)
 	{
-		SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
+		SubscriptionRelState	*rstate;
+
+		/*
+		 * Remove entries no longer necessary. The flag signals nothing if
+		 * subrel_local_state is not updated above. We can remove entries in
+		 * frozen hash safely.
+		 */
+		if (local_state_updated && !wstate->alive)
+		{
+			hash_search(subrel_local_state, &wstate->rs.relid,
+						HASH_REMOVE, NULL);
+			continue;
+		}
+
+		rstate = &wstate->rs;
+
+		/* This will be true in the next turn only for live entries */
+		wstate->alive = false;
 
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
 		{
@@ -344,7 +470,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 				 * workers for this subscription, while we have the lock, for
 				 * later.
 				 */
-				nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+				nsyncworkers =
+					logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
 			LWLockRelease(LogicalRepWorkerLock);
 
 			/*
@@ -401,16 +528,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			 * there is some free sync worker slot, start new sync worker
 			 * for the table.
 			 */
-			else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
+			else if (!syncworker &&
+					 nsyncworkers < max_sync_workers_per_subscription)
 			{
-				logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-										 MySubscription->oid,
-										 MySubscription->name,
-										 MyLogicalRepWorker->userid,
-										 rstate->relid);
+				TimestampTz	now = GetCurrentTimestamp();
+
+				/* Keep moderate intervals from the previous launch */
+				if (TimestampDifferenceExceeds(wstate->last_worker_start, now,
+											   wal_retrieve_retry_interval))
+				{
+					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+											 MySubscription->oid,
+											 MySubscription->name,
+											 MyLogicalRepWorker->userid,
+											 rstate->relid);
+					wstate->last_worker_start = now;
+				}
 			}
 		}
 	}
+
+	/* sequential scan ended. Allow insertions again. */
+	hash_unfreeze(subrel_local_state);
 }
 
 /*
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f4f152..8f7337c 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -75,6 +75,5 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern List *GetSubscriptionRelations(Oid subid);
-extern List *GetSubscriptionNotReadyRelations(Oid subid);
 
 #endif   /* PG_SUBSCRIPTION_REL_H */
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to