Hello,

At Thu, 20 Apr 2017 13:21:14 +0900, Masahiko Sawada <sawada.m...@gmail.com> 
wrote in <CAD21AoDrw0OaHE=ovrrhqx248kjj7w+1vim3k76ap46hnhj...@mail.gmail.com>
> On Thu, Apr 20, 2017 at 12:30 AM, Petr Jelinek
> <petr.jeli...@2ndquadrant.com> wrote:
> > On 19/04/17 15:57, Masahiko Sawada wrote:
> >> On Wed, Apr 19, 2017 at 10:07 PM, Petr Jelinek
> >> <petr.jeli...@2ndquadrant.com> wrote:
> >>> On 19/04/17 14:42, Masahiko Sawada wrote:
> >>>> On Wed, Apr 19, 2017 at 5:12 PM, Kyotaro HORIGUCHI
> >>>> <horiguchi.kyot...@lab.ntt.co.jp> wrote:
> >>>>> At Tue, 18 Apr 2017 18:40:56 +0200, Petr Jelinek 
> >>>>> <petr.jeli...@2ndquadrant.com> wrote in 
> >>>>> <f64d87d1-bef3-5e3e-a999-ba302816a...@2ndquadrant.com>
> >>>>>> On 18/04/17 18:14, Peter Eisentraut wrote:
> >>>>>>> On 4/18/17 11:59, Petr Jelinek wrote:
> >>>>>>>> Hmm if we create hashtable for this, I'd say create hashtable for the
> >>>>>>>> whole table_states then. The reason why it's list now was that it 
> >>>>>>>> seemed
> >>>>>>>> unnecessary to have hashtable when it will be empty almost always but
> >>>>>>>> there is no need to have both hashtable + list IMHO.
> >>>>>
> >>>>> I understant that but I also don't like the frequent palloc/pfree
> >>>>> in long-lasting context and double loop like Peter.
> >>>>>
> >>>>>>> The difference is that we blow away the list of states when the 
> >>>>>>> catalog
> >>>>>>> changes, but we keep the hash table with the start times around.  We
> >>>>>>> need two things with different life times.
> >>>>>
> >>>>> On the other hand, hash seems overdone. Addition to that, the
> >>>>> hash-version leaks stale entries while subscriptions are
> >>>>> modified. But vacuuming them costs high.
> >>>>>
> >>>>>> Why can't we just update the hashtable based on the catalog? I mean 
> >>>>>> once
> >>>>>> the record is not needed in the list, the table has been synced so 
> >>>>>> there
> >>>>>> is no need for the timestamp either since we'll not try to start the
> >>>>>> worker again.
> >>>>
> >>>> I guess the table sync worker for the same table could need to be
> >>>> started again. For example, please image a case where the table
> >>>> belonging to the publication is removed from it and the corresponding
> >>>> subscription is refreshed, and then the table is added to it again. We
> >>>> have the record of the table with timestamp in the hash table when the
> >>>> table sync in the first time, but the table sync after refreshed could
> >>>> have to wait for the interval.
> >>>>
> >>>
> >>> But why do we want to wait in such case where user has explicitly
> >>> requested refresh?
> >>>
> >>
> >> Yeah, sorry, I meant that we don't want to wait but cannot launch the
> >> tablesync worker in such case.
> >>
> >> But after more thought, the latest patch Peter proposed has the same
> >> problem. Perhaps we need to scan always whole pg_subscription_rel and
> >> remove the entry if the corresponding table get synced.
> >>
> >
> > Yes that's what I mean by "Why can't we just update the hashtable based
> > on the catalog". And if we do that then I don't understand why do we
> > need both hastable and linked list if we need to update both based on
> > catalog reads anyway.
> 
> Thanks, I've now understood correctly. Yes, I think you're right. If
> we update the hash table based on the catalog whenever table state is
> invalidated, we don't need to have both hash table and list.

Ah, ok. The patch from Peter still generating and replacing the
content of the list. The attached patch stores everything into
SubscriptionRelState. Oppositte to my anticiation, the hash can
be efectively kept small and removed.

> BTW, in current HEAD the SUBREL_STATE_SYNCWAIT is not stored in the
> pg_subscription_catalog. So the following condition seems not correct.
> We should use "syncworker->relstate == SUBSCRIPTION_STATE_SYNCWAIT"
> instead?

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
>From b0ce46d4973465ba2ae7550da2295b31945a73b1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp>
Date: Fri, 21 Apr 2017 17:26:09 +0900
Subject: [PATCH] Wait between tablesync worker restarts

Before restarting a tablesync worker for the same relation, wait
wal_retrieve_retry_interval.  This avoids restarting failing workers in
a tight loop.
---
 src/backend/catalog/pg_subscription.c       |  31 +++++---
 src/backend/replication/logical/tablesync.c | 106 +++++++++++++++++++---------
 src/include/catalog/pg_subscription_rel.h   |   6 +-
 3 files changed, 97 insertions(+), 46 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index a183850..4300f10 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -31,11 +31,11 @@
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/hsearch.h"
 #include "utils/pg_lsn.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
-
 static List *textarray_to_stringlist(ArrayType *textarray);
 
 /*
@@ -450,17 +450,17 @@ GetSubscriptionRelations(Oid subid)
 /*
  * Get all relations for subscription that are not in a ready state.
  *
- * Returned list is palloced in current memory context.
+ * Returns the number of live entries
  */
-List *
-GetSubscriptionNotReadyRelations(Oid subid)
+int
+GetSubscriptionNotReadyRelations(struct HTAB *localstate, Oid subid)
 {
-	List	   *res = NIL;
 	Relation	rel;
 	HeapTuple	tup;
 	int			nkeys = 0;
 	ScanKeyData	skey[2];
 	SysScanDesc	scan;
+	int			ret = 0;
 
 	rel = heap_open(SubscriptionRelRelationId, AccessShareLock);
 
@@ -481,20 +481,29 @@ GetSubscriptionNotReadyRelations(Oid subid)
 	{
 		Form_pg_subscription_rel	subrel;
 		SubscriptionRelState	   *relstate;
+		bool						found = false;
 
 		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
 
-		relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState));
-		relstate->relid = subrel->srrelid;
-		relstate->state = subrel->srsubstate;
-		relstate->lsn = subrel->srsublsn;
+		relstate  = hash_search(localstate, (void *) &subrel->srrelid,
+								HASH_ENTER, &found);
 
-		res = lappend(res, relstate);
+		if (!found) {
+			relstate->relid = subrel->srrelid;
+			relstate->state = subrel->srsubstate;
+			relstate->lsn = subrel->srsublsn;
+			relstate->last_worker_start = 0;
+		}
+
+		/* This is false here. Set as alive since found in the catalog. */
+		Assert(!relstate->alive);
+		relstate->alive = true;
+		ret++;
 	}
 
 	/* Cleanup */
 	systable_endscan(scan);
 	heap_close(rel, AccessShareLock);
 
-	return res;
+	return ret;
 }
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d287e95..09c75a5 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -243,7 +243,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  *
  * If there are tables that need synchronizing and are not being synchronized
  * yet, start sync workers for them (if there are free slots for sync
- * workers).
+ * workers).  To prevent starting the sync worker for the same relation at a
+ * high frequency after a failure, we store its last start time with each sync
+ * state info.  We start the sync worker for the same relation after waiting
+ * at least wal_retrieve_retry_interval.
  *
  * For tables that are being synchronized already, check if sync workers
  * either need action from the apply worker or have finished.
@@ -258,50 +261,77 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
  * If the synchronization position is reached, then the table can be marked as
  * READY and is no longer tracked.
  */
+
 static void
 process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 {
-	static List *table_states = NIL;
-	ListCell   *lc;
+	static HTAB			   *subrel_local_state = NULL;
+	HASH_SEQ_STATUS			hash_seq;
+	SubscriptionRelState   *rstate;
+	bool					local_state_updated = false;
+	int						nrels = 0;
 
 	Assert(!IsTransactionState());
 
 	/* We need up to date sync state info for subscription tables here. */
 	if (!table_states_valid)
 	{
-		MemoryContext	oldctx;
-		List		   *rstates;
-		ListCell	   *lc;
-		SubscriptionRelState *rstate;
-
-		/* Clean the old list. */
-		list_free_deep(table_states);
-		table_states = NIL;
+		/* Create the local state hash if not exists */
+		if (!subrel_local_state)
+		{
+			HASHCTL		ctl;
+
+			memset(&ctl, 0, sizeof(ctl));
+			ctl.keysize = sizeof(Oid);
+			ctl.entrysize = sizeof(SubscriptionRelState);
+			subrel_local_state =
+				hash_create("Logical replication table sync worker start times",
+							256, &ctl, HASH_ELEM | HASH_BLOBS);
+		}
 
 		StartTransactionCommand();
 
-		/* Fetch all non-ready tables. */
-		rstates	= GetSubscriptionNotReadyRelations(MySubscription->oid);
+		/* Update local state hash with non-ready tables. */
+		nrels = GetSubscriptionNotReadyRelations(subrel_local_state,
+												 MySubscription->oid);
+		local_state_updated = true;
+		CommitTransactionCommand();
 
-		/* Allocate the tracking info in a permanent memory context. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		foreach(lc, rstates)
+		table_states_valid = true;
+	}
+	/* Just count the entries if the local_state is not update this time */
+	else if (subrel_local_state != NULL)
+		nrels = hash_get_num_entries(subrel_local_state);
+
+	/*  No relation to be synched */
+	if (nrels == 0)
+	{
+		if (subrel_local_state != NULL)
 		{
-			rstate = palloc(sizeof(SubscriptionRelState));
-			memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
-			table_states = lappend(table_states, rstate);
+			hash_destroy(subrel_local_state);
+			subrel_local_state = NULL;
 		}
-		MemoryContextSwitchTo(oldctx);
-
-		CommitTransactionCommand();
 
-		table_states_valid = true;
+		return;
 	}
 
-	/* Process all tables that are being synchronized. */
-	foreach(lc, table_states)
+	/* Process all tables that are to be synchronized. */
+	hash_seq_init(&hash_seq, subrel_local_state);
+
+	while ((rstate = hash_seq_search(&hash_seq)) != NULL)
 	{
-		SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc);
+		/*
+		 * Remove entries no longer required. If subrel_local_state is not
+		 * updated above, this flag is ignored.
+		 */
+		if (local_state_updated && !rstate->alive)
+		{
+			hash_search(subrel_local_state, &rstate->relid, HASH_REMOVE, NULL);
+			continue;
+		}
+
+		/* Reset preparing for the next update of this hash entry */
+		rstate->alive = false;
 
 		if (rstate->state == SUBREL_STATE_SYNCDONE)
 		{
@@ -395,17 +425,25 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 			}
 
 			/*
-			 * If there is no sync worker registered for the table and
-			 * there is some free sync worker slot, start new sync worker
-			 * for the table.
+			 * If there is no sync worker registered for the table and there
+			 * is some free sync worker slot, start new sync worker for the
+			 * table.
 			 */
 			else if (!syncworker && nsyncworkers < max_sync_workers_per_subscription)
 			{
-				logicalrep_worker_launch(MyLogicalRepWorker->dbid,
-										 MySubscription->oid,
-										 MySubscription->name,
-										 MyLogicalRepWorker->userid,
-										 rstate->relid);
+				TimestampTz	now = GetCurrentTimestamp();
+
+				/* Keep moderate intervals */
+				if (TimestampDifferenceExceeds(rstate->last_worker_start, now,
+											   wal_retrieve_retry_interval))
+				{
+					logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+											 MySubscription->oid,
+											 MySubscription->name,
+											 MyLogicalRepWorker->userid,
+											 rstate->relid);
+					rstate->last_worker_start = now;
+				}
 			}
 		}
 	}
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 9f4f152..49c4723 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -66,8 +66,12 @@ typedef struct SubscriptionRelState
 	Oid			relid;
 	XLogRecPtr	lsn;
 	char		state;
+	TimestampTz last_worker_start;	/* time of the last launch of worker */
+	bool		alive;				/* this relid is alive in the catalog */
 } SubscriptionRelState;
 
+struct HTAB;
+
 extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state,
 								   XLogRecPtr sublsn);
 extern char GetSubscriptionRelState(Oid subid, Oid relid,
@@ -75,6 +79,6 @@ extern char GetSubscriptionRelState(Oid subid, Oid relid,
 extern void RemoveSubscriptionRel(Oid subid, Oid relid);
 
 extern List *GetSubscriptionRelations(Oid subid);
-extern List *GetSubscriptionNotReadyRelations(Oid subid);
+extern int GetSubscriptionNotReadyRelations(struct HTAB *localstate, Oid subid);
 
 #endif   /* PG_SUBSCRIPTION_REL_H */
-- 
2.9.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to