On 17/04/17 20:09, Peter Eisentraut wrote: > On 4/16/17 22:01, Noah Misch wrote: >> This PostgreSQL 10 open item is past due for your status update. Kindly send >> a status update within 24 hours, and include a date for your subsequent >> status >> update. Refer to the policy on open item ownership: >> https://www.postgresql.org/message-id/20170404140717.GA2675809%40tornado.leadboat.com > > I think we're not really sure yet what to do about this. Discussion is > ongoing. I'll report back on Wednesday. >
So my idea was to add some kind of inuse flag. This turned out to be bit more complicated in terms of how to clean it than I would have hoped. This is due to the fact that there is no way to reliably tell if worker has failed to start if the parent worker crashed while waiting. My solution to that is to use similar logic to autovacuum where we use timeout for worker to attach to shmem. We do this only if there is no free slot found when launch of replication worker was requested. While working on this patch I also noticed other subtle concurrency issues and fixed them as well - stopping worker that hasn't finished starting yet wasn't completely concurrency safe and limiting sync workers per subscription theoretically wasn't either (although I don't think it could happen in practice). I do wonder now though if it's such a good idea to have the BackgroundWorkerHandle private to the bgworker.c given that this is the 3rd time (twice before was outside of postgres core) I had to write similar generation mechanism that it uses for unique bgworker authentication inside the process which started it. It would have been much easier if I could just save the BackgroundWorkerHandle itself to shmem so it could be used across processes instead of having to reinvent it every time. -- Petr Jelinek http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 6d32379cda7d9f69c42185b9684d32570554f4e3 Mon Sep 17 00:00:00 2001 From: Petr Jelinek <pjmo...@pjmodos.net> Date: Wed, 19 Apr 2017 03:48:53 +0200 Subject: [PATCH] Fix various concurrency issues in logical replication worker launching The code was originally written with assumption that launcher is the only proccess starting the worker. However that hasn't been true since commit 7c4f52409 which failed to modify the worker management code adequately. This patch adds in_use field to LogicalRepWorker struct to indicate if the worker slot is being used and uses proper locking everywhere this flag is set or read. However if the parent process which dies while the new worker is starting and the new worker failes to attach to shared memory, this flag would never get cleared. We solve this rare corner case by adding sort of garbage collector for in_use slots. This uses another filed in the LogicalRepWorker struct named launch_time which contains timestamp of when the worker has been started. If any request to start a new worker does not find free slot, we'll check for workers that were supposed to start but took long to actually do so and reuse their slot. In passing also fix possible race conditions when stopping worker that hasn't finished starting yet. --- src/backend/replication/logical/launcher.c | 167 +++++++++++++++++++++++------ src/include/replication/worker_internal.h | 9 ++ 2 files changed, 141 insertions(+), 35 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index f6ae610..41f8cfe 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -38,6 +38,7 @@ #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/slot.h" +#include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); +static void logicalrep_worker_cleanup(LogicalRepWorker *worker); /* Flags set by signal handlers */ volatile sig_atomic_t got_SIGHUP = false; @@ -154,15 +156,19 @@ get_subscription_list(void) /* * Wait for a background worker to start up and attach to the shmem context. * - * This is like WaitForBackgroundWorkerStartup(), except that we wait for - * attaching, not just start and we also just exit if postmaster died. + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. */ -static bool +static void WaitForReplicationWorkerAttach(LogicalRepWorker *worker, BackgroundWorkerHandle *handle) { BgwHandleStatus status; int rc; + uint16 generation; + + /* Remember generation for future identification. */ + generation = worker->generation; for (;;) { @@ -170,18 +176,29 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + /* Worker either died or started, no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + LWLockRelease(LogicalRepWorkerLock); + + /* Check if worker has died before attaching and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); - /* - * Worker started and attached to our shmem. This check is safe - * because only launcher ever starts the workers, so nobody can steal - * the worker slot. - */ - if (status == BGWH_STARTED && worker->proc) - return true; - /* Worker didn't start or died before attaching to our shmem. */ if (status == BGWH_STOPPED) - return false; + { + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + logicalrep_worker_cleanup(worker); + LWLockRelease(LogicalRepWorkerLock); + return; + } /* * We need timeout because we generally don't get notified via latch @@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, ResetLatch(MyLatch); } - return false; + return; } /* @@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && w->relid == relid && - (!only_running || (w->proc && IsBackendPid(w->proc->pid)))) + if (w->in_use && w->subid == subid && w->relid == relid && + (!only_running || (w->proc))) { res = w; break; @@ -236,8 +253,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; + int i; int slot; LogicalRepWorker *worker = NULL; + int nsyncworkers = 0; ereport(LOG, (errmsg("starting logical replication worker for subscription \"%s\"", @@ -255,17 +274,74 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - /* Find unused worker slot. */ - for (slot = 0; slot < max_logical_replication_workers; slot++) + for (i = 0; i < max_logical_replication_workers; i++) { - if (!LogicalRepCtx->workers[slot].proc) + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* Find unused worker slot. */ + if (!w->in_use) { - worker = &LogicalRepCtx->workers[slot]; - break; + worker = w; + slot = i; } + /* Also count number of sync workers. */ + else if (relid != InvalidOid && w->subid == subid && + w->relid != InvalidOid) + nsyncworkers++; } - /* Bail if not found */ + /* + * If we didn't find free slot, try to do garbage collection. + * The reason we do this is because if some worker failed to start up + * and it's parent has crashed while waiting, the in_use state was never + * cleared. + */ + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + { + TimestampTz now = GetCurrentTimestamp(); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + /* + * If the worker was marked in use but didn't manage to attach + * in time, clean it up. + */ + if (w->in_use && !w->proc && + TimestampDifferenceExceeds(w->launch_time, now, + wal_receiver_timeout)) + { + elog(WARNING, + "logical replication worker for subscription \"%d\" took too long to start; canceled", + worker->subid); + + logicalrep_worker_cleanup(worker); + worker = w; + slot = i; + + /* If this was sync worker, decrese the count. */ + if (relid != InvalidOid && w->subid == subid && + w->relid != InvalidOid) + nsyncworkers--; + } + } + } + + /* + * If we reached sync worker limit per subscription just exit silently + * as we might get here due to otherwise harmless race condition. + */ + if (nsyncworkers >= max_sync_workers_per_subscription) + { + LWLockRelease(LogicalRepWorkerLock); + return; + } + + /* + * However if there are no more free worker slots, inform user about it + * before exiting. + */ if (worker == NULL) { LWLockRelease(LogicalRepWorkerLock); @@ -276,7 +352,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, return; } - /* Prepare the worker info. */ + /* Prepare the worker slot. */ + TIMESTAMP_NOBEGIN(worker->launch_time); + worker->in_use = true; + worker->generation++; worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; @@ -331,6 +410,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; + uint16 generation; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -344,10 +424,16 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* + * Remember which generation was our worker so we can check if what we + * see is still the same one. + */ + generation = worker->generation; + + /* * If we found worker but it does not have proc set it is starting up, * wait for it to finish and then kill it. */ - while (worker && !worker->proc) + while (worker->in_use && !worker->proc) { int rc; @@ -370,10 +456,11 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); /* - * Worker is no longer associated with subscription. It must have - * exited, nothing more for us to do. + * Check if worker slot is no longer used which would mean that the + * worker has exited, or that the worker generation is different + * meaning that different worker has taken the slot. */ - if (worker->subid == InvalidOid) + if (!worker->in_use || worker->generation != generation) { LWLockRelease(LogicalRepWorkerLock); return; @@ -394,7 +481,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) int rc; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - if (!worker->proc) + if (!worker->proc || worker->generation != generation) { LWLockRelease(LogicalRepWorkerLock); break; @@ -474,15 +561,26 @@ logicalrep_worker_detach(void) /* Block concurrent access. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - MyLogicalRepWorker->dbid = InvalidOid; - MyLogicalRepWorker->userid = InvalidOid; - MyLogicalRepWorker->subid = InvalidOid; - MyLogicalRepWorker->proc = NULL; + logicalrep_worker_cleanup(MyLogicalRepWorker); LWLockRelease(LogicalRepWorkerLock); } /* + * Cleanup worker info. + */ +static void +logicalrep_worker_cleanup(LogicalRepWorker *worker) +{ + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; + worker->userid = InvalidOid; + worker->subid = InvalidOid; + worker->relid = InvalidOid; +} + +/* * Cleanup function for logical replication launcher. * * Called on logical replication launcher exit. @@ -732,12 +830,11 @@ ApplyLauncherMain(Datum main_arg) if (sub->enabled && w == NULL) { - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid); last_start_time = now; wait_time = wal_retrieve_retry_interval; - /* Limit to one worker per mainloop cycle. */ - break; + + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); } } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b8e35d4..0535794 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -21,6 +21,15 @@ typedef struct LogicalRepWorker { + /* Time at which this worker was launche. */ + TimestampTz launch_time; + + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased everytime the slot is taken by new worker. */ + uint16 generation; + /* Pointer to proc array. NULL if not running. */ PGPROC *proc; -- 2.7.4
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers