On Wed, Nov 5, 2014 at 9:26 PM, Robert Haas <[email protected]> wrote:
> Yes, I think that's probably a net improvement in robustness quite
> apart from what we decide to do about any of the rest of this. I've
> attached it here as revise-procglobal-tracking.patch and will commit
> that bit if nobody objects. The remainder is reattached without
> change as group-locking-v0.1.patch.
>
> Per your other comment, I've developed the beginnings of a testing
> framework which I attached here as test_group_locking-v0.patch.
Urk. I attached a version with some stupid bugs. Here's a slightly better one.
--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/test_group_locking/Makefile b/contrib/test_group_locking/Makefile
new file mode 100644
index 0000000..2d09341
--- /dev/null
+++ b/contrib/test_group_locking/Makefile
@@ -0,0 +1,21 @@
+# contrib/test_group_locking/Makefile
+
+MODULE_big = test_group_locking
+OBJS = test_group_locking.o $(WIN32RES)
+PGFILEDESC = "test_group_locking - test harness for group locking"
+
+EXTENSION = test_group_locking
+DATA = test_group_locking--1.0.sql
+
+REGRESS = test_group_locking
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_group_locking
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_group_locking/test_group_locking--1.0.sql b/contrib/test_group_locking/test_group_locking--1.0.sql
new file mode 100644
index 0000000..adb2be5
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking--1.0.sql
@@ -0,0 +1,8 @@
+/* contrib/test_group_locking/test_group_locking--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_group_locking" to load this file. \quit
+
+CREATE FUNCTION test_group_locking(spec pg_catalog.text)
+ RETURNS pg_catalog.void STRICT
+ AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/test_group_locking/test_group_locking.c b/contrib/test_group_locking/test_group_locking.c
new file mode 100644
index 0000000..904beff
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking.c
@@ -0,0 +1,1071 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_group_locking.c
+ * Test harness code for group locking.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * contrib/test_shm_mq/test.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "commands/dbcommands.h"
+#include "fmgr.h"
+#include "lib/ilist.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "parser/scansup.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "utils/builtins.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_group_locking);
+
+void test_group_locking_worker_main(Datum main_arg);
+
+/* Names of lock modes, for debug printouts */
+static const char *const lock_mode_names[] =
+{
+ "INVALID",
+ "AccessShareLock",
+ "RowShareLock",
+ "RowExclusiveLock",
+ "ShareUpdateExclusiveLock",
+ "ShareLock",
+ "ShareRowExclusiveLock",
+ "ExclusiveLock",
+ "AccessExclusiveLock"
+};
+
+typedef enum
+{
+ TGL_START,
+ TGL_STOP,
+ TGL_LOCK,
+ TGL_UNLOCK
+} TestGroupLockOp;
+
+typedef struct
+{
+ TestGroupLockOp op;
+ LOCKMODE lockmode;
+ Oid relid;
+} TestGroupLockCommand;
+
+typedef struct
+{
+ dlist_node node;
+ bool verified;
+ int group_id;
+ int task_id;
+ TestGroupLockCommand command;
+} TestGroupLockStep;
+
+typedef struct
+{
+ int group_id;
+ int task_id;
+} worker_key;
+
+typedef struct
+{
+ worker_key key;
+ dsm_segment *seg;
+ BackgroundWorkerHandle *handle;
+ shm_mq_handle *requesth;
+ shm_mq_handle *responseh;
+ bool awaiting_response;
+} worker_info;
+
+typedef struct
+{
+ int group_id;
+ int leader_task_id;
+ bool has_followers;
+} leader_info;
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct worker_fixed_data
+{
+ Oid database_id;
+ Oid authenticated_user_id;
+ NameData database;
+ NameData authenticated_user;
+ bool use_group_locking;
+ pid_t leader_pid;
+} worker_fixed_data;
+
+#define SHM_QUEUE_SIZE 32768
+#define TEST_GROUP_LOCKING_MAGIC 0x4c616e65
+
+static void check_for_messages(HTAB *worker_hash);
+static void determine_leader_info(dlist_head *plan, HTAB *leader_hash);
+static void handle_sigterm(SIGNAL_ARGS);
+static void process_message(HTAB *worker_hash, worker_info *info,
+ char *message, Size message_bytes);
+static void rationalize_steps(dlist_head *plan);
+static void rationalize_steps_for_task(dlist_head *plan, int group_id,
+ int task_id);
+static void report_syntax_error(StringInfo buf);
+static bool scan_character(StringInfo buf, char c);
+static bool scan_eof(StringInfo buf);
+static char *scan_identifier(StringInfo buf);
+static bool scan_integer(StringInfo buf, int *result);
+static LOCKMODE scan_lockmode(StringInfo buf);
+static TestGroupLockOp scan_op(StringInfo buf);
+static RangeVar *scan_qualified_identifier(StringInfo buf);
+static char *scan_quoted_identifier(StringInfo buf);
+static void send_command(HTAB *worker_hash, TestGroupLockStep *step);
+static void start_worker(HTAB *worker_hash, int group_id, int task_id,
+ int leader_task_id);
+
+/*--------------------------------------------------------------------------
+ * Main entrypoint.
+ *
+ * Start background workers and have them issue lock requests against
+ * specified relations. We use a little mini-language to control this:
+ *
+ * N[.M]:start
+ * N[.M]:stop
+ * N[.M]:lock:lockmode:relation
+ * N[.M]:unlock:lockmode:relation
+ *
+ * N and M should be integers. M can be omitted, in which case it defaults
+ * to 0. Each (N, M) pair identifies a separate worker; those with the
+ * same value of N are in the same lock group. All workers not started
+ * explicitly are started before any other actions are taken; and all
+ * workers not terminated explicitly are terminated after all other actions
+ * are taken.
+ *--------------------------------------------------------------------------
+ */
+Datum
+test_group_locking(PG_FUNCTION_ARGS)
+{
+ text *spec = PG_GETARG_TEXT_PP(0);
+ StringInfo buf = makeStringInfo();
+ dlist_head plan;
+ dlist_iter iter;
+ HASHCTL hashctl;
+ HTAB *worker_hash;
+ HTAB *leader_hash;
+
+ appendBinaryStringInfo(buf, VARDATA_ANY(spec), VARSIZE_ANY_EXHDR(spec));
+ dlist_init(&plan);
+
+ /* Parse the user-provided specification. */
+ for (;;)
+ {
+ TestGroupLockStep *step;
+
+ step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+ if (!scan_integer(buf, &step->group_id))
+ report_syntax_error(buf);
+ if (scan_character(buf, '.') && !scan_integer(buf, &step->task_id))
+ report_syntax_error(buf);
+ if (!scan_character(buf, ':'))
+ report_syntax_error(buf);
+ step->command.op = scan_op(buf);
+
+ if (step->command.op == TGL_LOCK || step->command.op == TGL_UNLOCK)
+ {
+ RangeVar *rv;
+ if (!scan_character(buf, ':'))
+ report_syntax_error(buf);
+ step->command.lockmode = scan_lockmode(buf);
+ if (!scan_character(buf, ':'))
+ report_syntax_error(buf);
+ rv = scan_qualified_identifier(buf);
+
+ /*
+ * Since we're trying to test locking here, don't take a lock
+ * when locking the relation. That's unsafe in the presence of
+ * concurrent DDL, but since this is just test code, we don't
+ * care.
+ */
+ step->command.relid = RangeVarGetRelid(rv, NoLock, false);
+ }
+
+ dlist_push_tail(&plan, &step->node);
+
+ if (scan_eof(buf))
+ break;
+ if (!scan_character(buf, ','))
+ report_syntax_error(buf);
+ }
+
+ /* Make sure the series of steps looks sensible. */
+ rationalize_steps(&plan);
+
+ /* Initialize worker hash table. */
+ memset(&hashctl, 0, sizeof(HASHCTL));
+ hashctl.keysize = sizeof(worker_key);
+ hashctl.entrysize = sizeof(worker_info);
+ hashctl.hcxt = CurrentMemoryContext;
+ hashctl.hash = tag_hash;
+ worker_hash = hash_create("test_group_locking workers", 16, &hashctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* Initialize leader hash table. */
+ memset(&hashctl, 0, sizeof(HASHCTL));
+ hashctl.keysize = sizeof(int);
+ hashctl.entrysize = sizeof(leader_info);
+ hashctl.hcxt = CurrentMemoryContext;
+ hashctl.hash = tag_hash;
+ leader_hash = hash_create("test_group_locking leaders", 16, &hashctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ /* Determine group leadership information. */
+ determine_leader_info(&plan, leader_hash);
+
+ /* Execute the plan. */
+ dlist_foreach(iter, &plan)
+ {
+ TestGroupLockStep *step;
+
+ step = dlist_container(TestGroupLockStep, node, iter.cur);
+
+ if (step->command.op == TGL_START)
+ {
+ leader_info *li;
+
+ li = hash_search(leader_hash, &step->group_id, HASH_FIND, NULL);
+ Assert(li != NULL);
+ start_worker(worker_hash, step->group_id, step->task_id,
+ li->has_followers ? li->leader_task_id : -1);
+ continue;
+ }
+
+ send_command(worker_hash, step);
+ }
+
+ PG_RETURN_VOID();
+}
+
+/* Check for messages from our workers. */
+static void
+check_for_messages(HTAB *worker_hash)
+{
+ bool progress = true;
+
+ while (progress)
+ {
+ HASH_SEQ_STATUS hash_seq;
+ worker_info *info;
+
+ progress = false;
+ hash_seq_init(&hash_seq, worker_hash);
+ while ((info = hash_seq_search(&hash_seq)) != NULL)
+ {
+ shm_mq_result result;
+ Size nbytes;
+ void *data;
+
+ result = shm_mq_receive(info->responseh, &nbytes, &data, true);
+ if (result == SHM_MQ_DETACHED)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("connection to background worker %d.%d lost",
+ info->key.group_id, info->key.task_id)));
+ if (result == SHM_MQ_SUCCESS)
+ {
+ progress = true;
+ process_message(worker_hash, info, data, nbytes);
+ }
+ }
+ }
+}
+
+/* Determine leadership information for each group. */
+static void
+determine_leader_info(dlist_head *plan, HTAB *leader_hash)
+{
+ dlist_iter iter;
+
+ dlist_foreach(iter, plan)
+ {
+ TestGroupLockStep *step;
+ leader_info *li;
+ bool found;
+
+ step = dlist_container(TestGroupLockStep, node, iter.cur);
+ li = hash_search(leader_hash, &step->group_id, HASH_ENTER, &found);
+ if (!found)
+ {
+ li->leader_task_id = step->task_id;
+ li->has_followers = false;
+ }
+ else if (step->task_id != li->leader_task_id)
+ li->has_followers = true;
+ }
+}
+
+/* Error context callback. */
+static void
+error_callback(void *arg)
+{
+ worker_info *info = arg;
+
+ errcontext("background worker, group %d, task %d", info->key.group_id,
+ info->key.task_id);
+}
+
+/* Handle SIGTERM. */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ if (MyProc)
+ SetLatch(&MyProc->procLatch);
+
+ if (!proc_exit_inprogress)
+ {
+ InterruptPending = true;
+ ProcDiePending = true;
+ }
+
+ errno = save_errno;
+}
+
+/*
+ * Make sure we have a rational series of steps, and add missing start and
+ * stop steps as needed.
+ */
+static void
+rationalize_steps(dlist_head *plan)
+{
+ bool progress = true;
+
+ while (progress)
+ {
+ dlist_iter iter;
+ progress = false;
+
+ dlist_foreach(iter, plan)
+ {
+ TestGroupLockStep *step;
+
+ step = dlist_container(TestGroupLockStep, node, iter.cur);
+ if (!step->verified)
+ {
+ rationalize_steps_for_task(plan, step->group_id,
+ step->task_id);
+ progress = true;
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * Linear search through the provided list of steps. Figure out whether any
+ * start action is unique and precedes all other actions for this task, and
+ * whether any stop action is unique and follow all other such actions. If
+ * the steps are out of order, error; if they are missing, add them at the
+ * beginning and end as appropriate.
+ */
+static void
+rationalize_steps_for_task(dlist_head *plan, int group_id, int task_id)
+{
+ dlist_iter iter;
+ bool saw_start = false;
+ bool saw_stop = false;
+ bool saw_other = false;
+
+ dlist_foreach(iter, plan)
+ {
+ TestGroupLockStep *step;
+
+ step = dlist_container(TestGroupLockStep, node, iter.cur);
+ if (step->group_id != group_id || step->task_id != task_id)
+ continue;
+ step->verified = true;
+
+ switch (step->command.op)
+ {
+ case TGL_START:
+ if (saw_start)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("can't start same worker more than once")));
+ if (saw_stop || saw_other)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("can't start worker after stopping it")));
+ if (saw_other)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("worker can't perform actions before being started")));
+ saw_start = true;
+ break;
+ case TGL_STOP:
+ if (saw_stop)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("can't stop same worker more than once")));
+ saw_stop = true;
+ break;
+ default:
+ if (saw_stop)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("worker can't perform actions after being stopped")));
+ saw_other = true;
+ break;
+ }
+ }
+
+ if (!saw_start)
+ {
+ TestGroupLockStep *step;
+
+ step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+ step->group_id = group_id;
+ step->task_id = task_id;
+ step->command.op = TGL_START;
+ step->verified = true;
+ dlist_push_head(plan, &step->node);
+ }
+
+ if (!saw_stop)
+ {
+ TestGroupLockStep *step;
+
+ step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+ step->group_id = group_id;
+ step->task_id = task_id;
+ step->command.op = TGL_STOP;
+ step->verified = true;
+ dlist_push_tail(plan, &step->node);
+ }
+}
+
+/* Report a syntax error. */
+static void
+report_syntax_error(StringInfo buf)
+{
+ char badchar[MAX_MULTIBYTE_CHAR_LEN + 1];
+ int badpos = pg_mbstrlen_with_len(buf->data, buf->cursor) + 1;
+ int badcharlen;
+
+ if (buf->cursor >= buf->len)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unexpected end of string at position %d", badpos)));
+
+ badcharlen = pg_mblen(&buf->data[buf->cursor]);
+ memcpy(badchar, &buf->data[buf->cursor], badcharlen);
+ badchar[badcharlen] = '\0';
+
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unexpected character \"%s\" at position %d",
+ badchar, badpos)));
+}
+
+/* Scan a given character. */
+static bool
+scan_character(StringInfo buf, char c)
+{
+ if (buf->cursor < buf->len && buf->data[buf->cursor] == c)
+ {
+ ++buf->cursor;
+ return true;
+ }
+ return false;
+}
+
+/* Scan end-of-buffer. */
+static bool
+scan_eof(StringInfo buf)
+{
+ return buf->cursor >= buf->len;
+}
+
+/* Scan and return a single-part PostgreSQL identifier. */
+static char *
+scan_identifier(StringInfo buf)
+{
+ int start = buf->cursor;
+
+ if (buf->data[start] == '"')
+ return scan_quoted_identifier(buf);
+
+ while (buf->cursor < buf->len)
+ {
+ int len = pg_mblen(&buf->data[buf->cursor]);
+ char c;
+
+ /* Multibyte characters are allowed. */
+ if (len != 1)
+ {
+ Assert(len > 0);
+ buf->cursor += len;
+ continue;
+ }
+
+ /* Alphabetic characters, and underscore, are allowed. */
+ c = buf->data[buf->cursor];
+ if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_')
+ {
+ buf->cursor++;
+ continue;
+ }
+
+ /* Numeric digits, and $, are allowed but not at character 1. */
+ if (start != buf->cursor && ((c >= '0' && c <= '9') || c == '$'))
+ {
+ buf->cursor++;
+ continue;
+ }
+
+ /* Anything else is not allowed. */
+ break;
+ }
+
+ /* Return NULL if we didn't find an identifier. */
+ if (buf->cursor == start)
+ return NULL;
+
+ /* Return a copy of the identifier with appropriate case-folding. */
+ return downcase_truncate_identifier(&buf->data[start], buf->cursor - start,
+ false);
+}
+
+/* Scan an integer. */
+static bool
+scan_integer(StringInfo buf, int *result)
+{
+ int start = buf->cursor;
+ int val = 0;
+
+ while (buf->cursor < buf->len)
+ {
+ char c = buf->data[buf->cursor];
+
+ if (c < '0' || c > '9')
+ break;
+ val = val * 10 + (c - '0');
+ ++buf->cursor;
+ }
+
+ if (buf->cursor == start)
+ return false;
+ *result = val;
+ return true;
+}
+
+/* Scan and return a lock mode. */
+static LOCKMODE
+scan_lockmode(StringInfo buf)
+{
+ char *mode = scan_identifier(buf);
+
+ if (mode == NULL)
+ report_syntax_error(buf);
+
+ if (pg_strcasecmp(mode, "AccessShareLock") == 0)
+ return AccessShareLock;
+ else if (pg_strcasecmp(mode, "RowShareLock") == 0)
+ return RowShareLock;
+ else if (pg_strcasecmp(mode, "RowExclusiveLock") == 0)
+ return RowExclusiveLock;
+ else if (pg_strcasecmp(mode, "ShareUpdateExclusiveLock") == 0)
+ return ShareUpdateExclusiveLock;
+ else if (pg_strcasecmp(mode, "ShareLock") == 0)
+ return ShareLock;
+ else if (pg_strcasecmp(mode, "ShareRowExclusiveLock") == 0)
+ return ShareRowExclusiveLock;
+ else if (pg_strcasecmp(mode, "ExclusiveLock") == 0)
+ return ExclusiveLock;
+ else if (pg_strcasecmp(mode, "AccessExclusiveLock") == 0)
+ return AccessExclusiveLock;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid lock mode: \"%s\"", mode)));
+}
+
+/* Scan and return an operation name. */
+static TestGroupLockOp
+scan_op(StringInfo buf)
+{
+ char *opname = scan_identifier(buf);
+
+ if (opname == NULL)
+ report_syntax_error(buf);
+
+ if (pg_strcasecmp(opname, "start") == 0)
+ return TGL_START;
+ else if (pg_strcasecmp(opname, "stop") == 0)
+ return TGL_STOP;
+ else if (pg_strcasecmp(opname, "lock") == 0)
+ return TGL_LOCK;
+ else if (pg_strcasecmp(opname, "unlock") == 0)
+ return TGL_UNLOCK;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid operation name: \"%s\"", opname)));
+}
+
+/* Scan and return a possibly schema-qualified identifier. */
+static RangeVar *
+scan_qualified_identifier(StringInfo buf)
+{
+ char *name1;
+ char *name2;
+
+ name1 = scan_identifier(buf);
+ if (name1 == NULL)
+ report_syntax_error(buf);
+ if (buf->data[buf->cursor] != '.')
+ return makeRangeVar(NULL, name1, -1);
+ buf->cursor++;
+ name2 = scan_identifier(buf);
+ if (name2 == NULL)
+ report_syntax_error(buf);
+ return makeRangeVar(name1, name2, -1);
+}
+
+/* Scan and return a quoted single-part identifier. */
+static char *
+scan_quoted_identifier(StringInfo buf)
+{
+ StringInfoData result;
+
+ initStringInfo(&result);
+
+ if (buf->data[buf->cursor] != '"')
+ return NULL;
+
+ while (++buf->cursor < buf->len)
+ {
+ char *s;
+
+ /* If we see a byte that is not a quote, append to result. */
+ s = &buf->data[buf->cursor];
+ if (s[0] != '"')
+ {
+ appendStringInfoChar(&result, s[0]);
+ continue;
+ }
+
+ /* If we see a byte that is a quote, check for a following quote. */
+ if (++buf->cursor < buf->len && s[1] == '"')
+ {
+ appendStringInfoChar(&result, s[0]);
+ continue;
+ }
+
+ /* We've found the terminating quote, so stop here. */
+ return result.data;
+ }
+
+ /* We ran off the end of the buffer with no close-quote. Oops. */
+ return NULL;
+}
+
+/* Process a message from a background worker. */
+static void
+process_message(HTAB *worker_hash, worker_info *info, char *message,
+ Size message_bytes)
+{
+ StringInfoData msg;
+ char msgtype;
+ const char *tag;
+
+ initStringInfo(&msg);
+ enlargeStringInfo(&msg, message_bytes);
+ appendBinaryStringInfo(&msg, message, message_bytes);
+ msgtype = pq_getmsgbyte(&msg);
+
+ if (msgtype == 'E' || msgtype == 'N')
+ {
+ ErrorData edata;
+ ErrorContextCallback context;
+
+ pq_parse_errornotice(&msg, &edata);
+ edata.elevel = Min(edata.elevel, ERROR);
+ context.callback = error_callback;
+ context.arg = info;
+ context.previous = error_context_stack;
+ error_context_stack = &context;
+ ThrowErrorData(&edata);
+ error_context_stack = context.previous;
+ return;
+ }
+
+ /* Not error or notice, so must be command complete. */
+ if (msgtype != 'C')
+ elog(ERROR, "unknown message type: %c (%zu bytes)",
+ msg.data[0], message_bytes);
+ tag = pq_getmsgstring(&msg);
+
+ /* Hopefully we were waiting for a response... */
+ if (!info->awaiting_response)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unexpected acknowledgement from worker %d.%d: \"%s\"",
+ info->key.group_id, info->key.task_id, tag)));
+ info->awaiting_response = false;
+
+ /* If the client indicates that it will stop, detach from it. */
+ if (strcmp(tag, "STOP") == 0)
+ {
+ dsm_detach(info->seg);
+ hash_search(worker_hash, &info->key, HASH_REMOVE, NULL);
+ }
+}
+
+/* Send a command to a background worker. */
+static void
+send_command(HTAB *worker_hash, TestGroupLockStep *step)
+{
+ worker_key key;
+ bool found;
+ worker_info *info;
+ shm_mq_result result;
+
+ key.group_id = step->group_id;
+ key.task_id = step->task_id;
+ info = hash_search(worker_hash, &key, HASH_FIND, &found);
+ Assert(found);
+
+ /* Display progress report. */
+ switch (step->command.op)
+ {
+ case TGL_STOP:
+ ereport(NOTICE,
+ (errmsg("stopping worker %d.%d",
+ step->group_id, step->task_id)));
+ break;
+
+ case TGL_LOCK:
+ ereport(NOTICE,
+ (errmsg("instructing worker %d.%d to acquire %s on relation with OID %u",
+ step->group_id, step->task_id,
+ lock_mode_names[step->command.lockmode],
+ step->command.relid)));
+ break;
+
+ case TGL_UNLOCK:
+ ereport(NOTICE,
+ (errmsg("instructing worker %d.%d to release %s on relation with OID %u",
+ step->group_id, step->task_id,
+ lock_mode_names[step->command.lockmode],
+ step->command.relid)));
+ break;
+ default:
+ elog(ERROR, "bad operation code: %d", (int) step->command.op);
+ }
+
+ /* Transmit command to worker. */
+ result = shm_mq_send(info->requesth, sizeof(TestGroupLockCommand),
+ &step->command, false);
+ if (result != SHM_MQ_SUCCESS)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("connection to background worker lost")));
+ info->awaiting_response = true;
+ for (;;)
+ {
+ check_for_messages(worker_hash);
+ if (!info->awaiting_response)
+ break;
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(&MyProc->procLatch);
+ }
+
+}
+
+/* Start a background worker. */
+static void
+start_worker(HTAB *worker_hash, int group_id, int task_id, int leader_task_id)
+{
+ worker_key key;
+ worker_info *info;
+ bool found;
+ shm_toc_estimator e;
+ Size segsize;
+ shm_toc *toc;
+ worker_fixed_data *fdata;
+ shm_mq *requestq;
+ shm_mq *responseq;
+ BackgroundWorker worker;
+
+ /* Set up entry in hash table. */
+ key.group_id = group_id;
+ key.task_id = task_id;
+ info = hash_search(worker_hash, &key, HASH_ENTER, &found);
+ Assert(!found);
+ memset(((char *) info) + sizeof(worker_key), 0,
+ sizeof(worker_info) - sizeof(worker_key));
+
+ /* Log a message explaining what we're going to do. */
+ if (leader_task_id < 0)
+ ereport(NOTICE,
+ (errmsg("starting worker %d.%d", group_id, task_id)));
+ else if (task_id == leader_task_id)
+ ereport(NOTICE,
+ (errmsg("starting worker %d.%d as group leader",
+ group_id, task_id)));
+ else
+ ereport(NOTICE,
+ (errmsg("starting worker %d.%d with group leader %d.%d",
+ group_id, task_id, group_id, leader_task_id)));
+
+ /* Create dynamic shared memory segment and table of contents. */
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(worker_fixed_data));
+ shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE);
+ shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE);
+ shm_toc_estimate_keys(&e, 3);
+ segsize = shm_toc_estimate(&e);
+ info->seg = dsm_create(segsize);
+ toc = shm_toc_create(TEST_GROUP_LOCKING_MAGIC,
+ dsm_segment_address(info->seg), segsize);
+
+ /* Store fixed-size data in dynamic shared memory. */
+ fdata = shm_toc_allocate(toc, sizeof(worker_fixed_data));
+ fdata->database_id = MyDatabaseId;
+ fdata->authenticated_user_id = GetAuthenticatedUserId();
+ namestrcpy(&fdata->database, get_database_name(MyDatabaseId));
+ namestrcpy(&fdata->authenticated_user,
+ GetUserNameFromId(fdata->authenticated_user_id));
+ shm_toc_insert(toc, 0, fdata);
+ if (leader_task_id >= 0)
+ {
+ fdata->use_group_locking = true;
+ if (task_id == leader_task_id)
+ fdata->leader_pid = 0;
+ else
+ {
+ worker_key lkey;
+ worker_info *leader_info;
+
+ lkey.group_id = group_id;
+ lkey.task_id = leader_task_id;
+ leader_info = hash_search(worker_hash, &lkey, HASH_ENTER, &found);
+ Assert(found);
+ if (GetBackgroundWorkerPid(leader_info->handle, &fdata->leader_pid)
+ != BGWH_STARTED)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not determine PID of leader %d.%d",
+ group_id, leader_task_id)));
+ }
+ }
+
+ /* Establish message queues in dynamic shared memory. */
+ requestq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE),
+ SHM_QUEUE_SIZE);
+ shm_toc_insert(toc, 1, requestq);
+ shm_mq_set_sender(requestq, MyProc);
+ info->requesth = shm_mq_attach(requestq, info->seg, NULL);
+ responseq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE),
+ SHM_QUEUE_SIZE);
+ shm_toc_insert(toc, 2, responseq);
+ shm_mq_set_receiver(responseq, MyProc);
+ info->responseh = shm_mq_attach(responseq, info->seg, NULL);
+
+ /* Configure a worker. */
+ worker.bgw_flags =
+ BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_main = NULL; /* new worker might not have library loaded */
+ sprintf(worker.bgw_library_name, "test_group_locking");
+ sprintf(worker.bgw_function_name, "test_group_locking_worker_main");
+ snprintf(worker.bgw_name, BGW_MAXLEN,
+ "test_group_locking %d/%d by PID %d", group_id, task_id, MyProcPid);
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(info->seg));
+ /* set bgw_notify_pid, so we can detect if the worker stops */
+ worker.bgw_notify_pid = MyProcPid;
+
+ /* Register the worker. */
+ if (!RegisterDynamicBackgroundWorker(&worker, &info->handle))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background process"),
+ errhint("You may need to increase max_worker_processes.")));
+ shm_mq_set_handle(info->requesth, info->handle);
+ shm_mq_set_handle(info->responseh, info->handle);
+
+ /* Wait for the worker to come online. */
+ info->awaiting_response = true;
+ for (;;)
+ {
+ check_for_messages(worker_hash);
+ if (!info->awaiting_response)
+ break;
+ WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(&MyProc->procLatch);
+ }
+}
+
+/* Background worker entrypoint. */
+void
+test_group_locking_worker_main(Datum main_arg)
+{
+ dsm_segment *seg;
+ shm_toc *toc;
+ shm_mq *requestq;
+ shm_mq *responseq;
+ shm_mq_handle *requesth;
+ shm_mq_handle *responseh;
+ worker_fixed_data *fdata;
+
+ /* Establish signal handlers. */
+ pqsignal(SIGTERM, handle_sigterm);
+ BackgroundWorkerUnblockSignals();
+
+ /* Set up a memory context and resource owner. */
+ Assert(CurrentResourceOwner == NULL);
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_group_locking");
+ CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+ "test_group_locking",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* Connect to the dynamic shared memory segment. */
+ seg = dsm_attach(DatumGetInt32(main_arg));
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("unable to map dynamic shared memory segment")));
+ toc = shm_toc_attach(TEST_GROUP_LOCKING_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("bad magic number in dynamic shared memory segment")));
+
+ /* Find shared memory queues and attach to them. */
+ requestq = shm_toc_lookup(toc, 1);
+ shm_mq_set_receiver(requestq, MyProc);
+ requesth = shm_mq_attach(requestq, seg, NULL);
+ responseq = shm_toc_lookup(toc, 2);
+ shm_mq_set_sender(responseq, MyProc);
+ responseh = shm_mq_attach(responseq, seg, NULL);
+ pq_redirect_to_shm_mq(responseq, responseh);
+
+ /* Connect to database. */
+ fdata = shm_toc_lookup(toc, 0);
+ BackgroundWorkerInitializeConnection(NameStr(fdata->database),
+ NameStr(fdata->authenticated_user));
+ if (fdata->database_id != MyDatabaseId ||
+ fdata->authenticated_user_id != GetAuthenticatedUserId())
+ ereport(ERROR,
+ (errmsg("user or database renamed during worker startup")));
+
+ /* Activate group locking, if appropriate. */
+ if (fdata->use_group_locking)
+ {
+ if (fdata->leader_pid == 0)
+ BecomeLockGroupLeader();
+ else
+ {
+ PGPROC *proc;
+
+ /*
+ * This is a cheesy hack that I'm going with for the sake of
+ * getting this test code running. Don't really do it this way!
+ *
+ * In a real parallel computation, all of the workers in a lock
+ * group would be started by the same process, which should pass
+ * its own value of MyProc and its pid to those followers. That
+ * way, if the leader exits before the children are up and running,
+ * they'll fail to join the lock group unless (a) the same PID
+ * is again running and (b) it is a PostgreSQL process and (c) it
+ * it using the same PGPROC as before and (d) it is again a lock
+ * group leader. Looking up the proc using the PID, as we're doing
+ * here, loses the third of those guarantees - which is not a
+ * catastrophe, but best avoided.
+ */
+
+ proc = BackendPidGetProc(fdata->leader_pid);
+ if (proc == NULL
+ || !BecomeLockGroupFollower(proc, fdata->leader_pid))
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not join lock group for leader PID %d",
+ fdata->leader_pid)));
+ }
+ }
+
+ /* Inform the worker who started us that we're up and running. */
+ pq_putmessage('C', "START", 6);
+
+ /* Begin a transaction. */
+ StartTransactionCommand();
+
+ /* Main loop: read and process messages. */
+ for (;;)
+ {
+ Size nbytes;
+ void *data;
+ shm_mq_result result;
+ TestGroupLockCommand *command;
+
+ result = shm_mq_receive(requesth, &nbytes, &data, false);
+ if (result != SHM_MQ_SUCCESS)
+ ereport(FATAL,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("connection to user backend lost")));
+ if (nbytes != sizeof(TestGroupLockCommand))
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid command message from user backend")));
+ command = data;
+
+ switch (command->op)
+ {
+ case TGL_STOP:
+ CommitTransactionCommand();
+ pq_putmessage('C', "STOP", 5);
+ exit(0);
+ break;
+
+ case TGL_LOCK:
+ if (!ConditionalLockRelationOid(command->relid,
+ command->lockmode))
+ ereport(ERROR,
+ (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+ errmsg("could not obtain %s on relation with OID %u",
+ lock_mode_names[command->lockmode],
+ command->relid)));
+ pq_putmessage('C', "LOCK", 5);
+ break;
+
+ case TGL_UNLOCK:
+ UnlockRelationOid(command->relid, command->lockmode);
+ pq_putmessage('C', "UNLOCK", 7);
+ break;
+
+ default:
+ elog(ERROR, "unknown operation: %d", (int) command->op);
+ }
+ }
+}
diff --git a/contrib/test_group_locking/test_group_locking.control b/contrib/test_group_locking/test_group_locking.control
new file mode 100644
index 0000000..3b69359
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking.control
@@ -0,0 +1,4 @@
+comment = 'Test code for group locking'
+default_version = '1.0'
+module_pathname = '$libdir/test_group_locking'
+relocatable = true
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers