On 21/04/17 16:31, Petr Jelinek wrote:
> On 21/04/17 16:23, Peter Eisentraut wrote:
>> On 4/21/17 10:11, Petr Jelinek wrote:
>>> On 21/04/17 16:09, Peter Eisentraut wrote:
>>>> On 4/20/17 14:29, Petr Jelinek wrote:
>>>>> +         /* Find unused worker slot. */
>>>>> +         if (!w->in_use)
>>>>>           {
>>>>> -                 worker = &LogicalRepCtx->workers[slot];
>>>>> -                 break;
>>>>> +                 worker = w;
>>>>> +                 slot = i;
>>>>> +         }
>>>>
>>>> Doesn't this still need a break?  Otherwise it always picks the last slot.
>>>>
>>>
>>> Yes it will pick the last slot, does that matter though, is the first
>>> one better somehow?
>>>
>>> We can't break because we also need to continue the counter (I think the
>>> issue that the counter solves is probably just theoretical, but still).
>>
>> I see.  I think the code would be less confusing if we break the loop
>> like before and call logicalrep_sync_worker_count() separately.
>>
>>> Hmm actually, maybe the if (!w->in_use) should be if (worker == NULL &&
>>> !w->in_use)?
>>
>> That would also do it.  But it's getting a bit fiddly.
>>
> 
> I just wanted to avoid looping twice, especially since the garbage
> collecting code has to also do the loop. I guess I'll go with my
> original coding for this then which was to put retry label above the
> loop first, then try finding worker slot, if found call the
> logicalrep_sync_worker_count and if not found do the garbage collection
> and if we cleaned up something then goto retry.
> 

Here is the patch doing just that.

-- 
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
From fd16e40c870fc08b76ba823e75d6126361bb0ca6 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmo...@pjmodos.net>
Date: Wed, 19 Apr 2017 03:48:53 +0200
Subject: [PATCH] Fix various concurrency issues in logical replication worker
 launching

The code was originally written with assumption that launcher is the
only proccess starting the worker. However that hasn't been true since
commit 7c4f52409 which failed to modify the worker management code
adequately.

This patch adds in_use field to LogicalRepWorker struct to indicate if
the worker slot is being used and uses proper locking everywhere this
flag is set or read.

However if the parent process which dies while the new worker is
starting and the new worker failes to attach to shared memory, this flag
would never get cleared. We solve this rare corner case by adding sort
of garbage collector for in_use slots. This uses another filed in the
LogicalRepWorker struct named launch_time which contains timestamp of
when the worker has been started. If any request to start a new worker
does not find free slot, we'll check for workers that were supposed to
start but took long to actually do so and reuse their slot.

In passing also fix possible race conditions when stopping worker that
hasn't finished starting yet.
---
 src/backend/replication/logical/launcher.c | 162 +++++++++++++++++++++++------
 src/include/replication/worker_internal.h  |   9 ++
 2 files changed, 138 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index f6ae610..b55ac2a 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -38,6 +38,7 @@
 #include "replication/logicallauncher.h"
 #include "replication/logicalworker.h"
 #include "replication/slot.h"
+#include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 
 #include "storage/ipc.h"
@@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
+static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
 
 /* Flags set by signal handlers */
 volatile sig_atomic_t got_SIGHUP = false;
@@ -154,15 +156,19 @@ get_subscription_list(void)
 /*
  * Wait for a background worker to start up and attach to the shmem context.
  *
- * This is like WaitForBackgroundWorkerStartup(), except that we wait for
- * attaching, not just start and we also just exit if postmaster died.
+ * This is only needed for cleaning up the shared memory in case the worker
+ * fails to attach.
  */
-static bool
+static void
 WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                                                           
BackgroundWorkerHandle *handle)
 {
        BgwHandleStatus status;
        int                     rc;
+       uint16          generation;
+
+       /* Remember generation for future identification. */
+       generation = worker->generation;
 
        for (;;)
        {
@@ -170,18 +176,29 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 
                CHECK_FOR_INTERRUPTS();
 
+               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+               /* Worker either died or started, no need to do anything. */
+               if (!worker->in_use || worker->proc)
+               {
+                       LWLockRelease(LogicalRepWorkerLock);
+                       return;
+               }
+
+               LWLockRelease(LogicalRepWorkerLock);
+
+               /* Check if worker has died before attaching and clean up after 
it. */
                status = GetBackgroundWorkerPid(handle, &pid);
 
-               /*
-                * Worker started and attached to our shmem. This check is safe
-                * because only launcher ever starts the workers, so nobody can 
steal
-                * the worker slot.
-                */
-               if (status == BGWH_STARTED && worker->proc)
-                       return true;
-               /* Worker didn't start or died before attaching to our shmem. */
                if (status == BGWH_STOPPED)
-                       return false;
+               {
+                       LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
+                       /* Ensure that this was indeed the worker we waited 
for. */
+                       if (generation == worker->generation)
+                               logicalrep_worker_cleanup(worker);
+                       LWLockRelease(LogicalRepWorkerLock);
+                       return;
+               }
 
                /*
                 * We need timeout because we generally don't get notified via 
latch
@@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
                ResetLatch(MyLatch);
        }
 
-       return false;
+       return;
 }
 
 /*
@@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool 
only_running)
        for (i = 0; i < max_logical_replication_workers; i++)
        {
                LogicalRepWorker   *w = &LogicalRepCtx->workers[i];
-               if (w->subid == subid && w->relid == relid &&
-                       (!only_running || (w->proc && 
IsBackendPid(w->proc->pid))))
+               if (w->in_use && w->subid == subid && w->relid == relid &&
+                       (!only_running || (w->proc)))
                {
                        res = w;
                        break;
@@ -236,8 +253,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
 {
        BackgroundWorker        bgw;
        BackgroundWorkerHandle *bgw_handle;
+       int                                     i;
        int                                     slot;
        LogicalRepWorker   *worker = NULL;
+       int                                     nsyncworkers = 0;
+       TimestampTz                     now = GetCurrentTimestamp();
 
        ereport(LOG,
                        (errmsg("starting logical replication worker for 
subscription \"%s\"",
@@ -255,17 +275,72 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
+retry:
        /* Find unused worker slot. */
-       for (slot = 0; slot < max_logical_replication_workers; slot++)
+       for (i = 0; i < max_logical_replication_workers; i++)
        {
-               if (!LogicalRepCtx->workers[slot].proc)
+               LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+               /* Find unused worker slot. */
+               if (!w->in_use)
                {
-                       worker = &LogicalRepCtx->workers[slot];
+                       worker = w;
+                       slot = i;
                        break;
                }
        }
 
-       /* Bail if not found */
+       nsyncworkers = logicalrep_sync_worker_count(subid);
+
+       /*
+        * If we didn't find free slot, try to do garbage collection.
+        * The reason we do this is because if some worker failed to start up
+        * and it's parent has crashed while waiting, the in_use state was never
+        * cleared.
+        */
+       if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
+       {
+               bool    did_cleanup = false;
+
+               for (i = 0; i < max_logical_replication_workers; i++)
+               {
+                       LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+                       /*
+                        * If the worker was marked in use but didn't manage to 
attach
+                        * in time, clean it up.
+                        */
+                       if (w->in_use && !w->proc &&
+                               TimestampDifferenceExceeds(w->launch_time, now,
+                                                                               
   wal_receiver_timeout))
+                       {
+                               elog(WARNING,
+                                        "logical replication worker for 
subscription \"%d\" took too long to start; canceled",
+                                        worker->subid);
+
+                               logicalrep_worker_cleanup(w);
+                               did_cleanup = true;
+                       }
+               }
+
+               if (did_cleanup)
+                       goto retry;
+       }
+
+       /*
+        * If we reached sync worker limit per subscription just exit silently
+        * as we might get here due to otherwise harmless race condition.
+        */
+       if (nsyncworkers >= max_sync_workers_per_subscription)
+       {
+               LWLockRelease(LogicalRepWorkerLock);
+               return;
+       }
+
+       /*
+        * However if there are no more free worker slots, inform user about it
+        * before exiting.
+        */
        if (worker == NULL)
        {
                LWLockRelease(LogicalRepWorkerLock);
@@ -276,7 +351,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char 
*subname, Oid userid,
                return;
        }
 
-       /* Prepare the worker info. */
+       /* Prepare the worker slot. */
+       worker->launch_time = now;
+       worker->in_use = true;
+       worker->generation++;
        worker->proc = NULL;
        worker->dbid = dbid;
        worker->userid = userid;
@@ -331,6 +409,7 @@ void
 logicalrep_worker_stop(Oid subid, Oid relid)
 {
        LogicalRepWorker *worker;
+       uint16  generation;
 
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
@@ -344,10 +423,16 @@ logicalrep_worker_stop(Oid subid, Oid relid)
        }
 
        /*
+        * Remember which generation was our worker so we can check if what we
+        * see is still the same one.
+        */
+       generation = worker->generation;
+
+       /*
         * If we found worker but it does not have proc set it is starting up,
         * wait for it to finish and then kill it.
         */
-       while (worker && !worker->proc)
+       while (worker->in_use && !worker->proc)
        {
                int     rc;
 
@@ -370,10 +455,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
                /*
-                * Worker is no longer associated with subscription.  It must 
have
-                * exited, nothing more for us to do.
+                * Check if worker slot is no longer used which would mean that 
the
+                * worker has exited, or that the worker generation is different
+                * meaning that different worker has taken the slot.
                 */
-               if (worker->subid == InvalidOid)
+               if (!worker->in_use || worker->generation != generation)
                {
                        LWLockRelease(LogicalRepWorkerLock);
                        return;
@@ -394,7 +480,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
                int     rc;
 
                LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-               if (!worker->proc)
+               if (!worker->proc || worker->generation != generation)
                {
                        LWLockRelease(LogicalRepWorkerLock);
                        break;
@@ -474,15 +560,26 @@ logicalrep_worker_detach(void)
        /* Block concurrent access. */
        LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
 
-       MyLogicalRepWorker->dbid = InvalidOid;
-       MyLogicalRepWorker->userid = InvalidOid;
-       MyLogicalRepWorker->subid = InvalidOid;
-       MyLogicalRepWorker->proc = NULL;
+       logicalrep_worker_cleanup(MyLogicalRepWorker);
 
        LWLockRelease(LogicalRepWorkerLock);
 }
 
 /*
+ * Cleanup worker info.
+ */
+static void
+logicalrep_worker_cleanup(LogicalRepWorker *worker)
+{
+       worker->in_use = false;
+       worker->proc = NULL;
+       worker->dbid = InvalidOid;
+       worker->userid = InvalidOid;
+       worker->subid = InvalidOid;
+       worker->relid = InvalidOid;
+}
+
+/*
  * Cleanup function for logical replication launcher.
  *
  * Called on logical replication launcher exit.
@@ -732,12 +829,11 @@ ApplyLauncherMain(Datum main_arg)
 
                                if (sub->enabled && w == NULL)
                                {
-                                       logicalrep_worker_launch(sub->dbid, 
sub->oid, sub->name,
-                                                                               
         sub->owner, InvalidOid);
                                        last_start_time = now;
                                        wait_time = wal_retrieve_retry_interval;
-                                       /* Limit to one worker per mainloop 
cycle. */
-                                       break;
+
+                                       logicalrep_worker_launch(sub->dbid, 
sub->oid, sub->name,
+                                                                               
         sub->owner, InvalidOid);
                                }
                        }
 
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index b8e35d4..0535794 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -21,6 +21,15 @@
 
 typedef struct LogicalRepWorker
 {
+       /* Time at which this worker was launche. */
+       TimestampTz     launch_time;
+
+       /* Indicates if this slot is used or free. */
+       bool    in_use;
+
+       /* Increased everytime the slot is taken by new worker. */
+       uint16  generation;
+
        /* Pointer to proc array. NULL if not running. */
        PGPROC *proc;
 
-- 
2.7.4

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to