On Wed, Nov 5, 2014 at 9:26 PM, Robert Haas <robertmh...@gmail.com> wrote:
> Yes, I think that's probably a net improvement in robustness quite
> apart from what we decide to do about any of the rest of this.  I've
> attached it here as revise-procglobal-tracking.patch and will commit
> that bit if nobody objects.  The remainder is reattached without
> change as group-locking-v0.1.patch.
>
> Per your other comment, I've developed the beginnings of a testing
> framework which I attached here as test_group_locking-v0.patch.

Urk.  I attached a version with some stupid bugs.  Here's a slightly better one.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/test_group_locking/Makefile b/contrib/test_group_locking/Makefile
new file mode 100644
index 0000000..2d09341
--- /dev/null
+++ b/contrib/test_group_locking/Makefile
@@ -0,0 +1,21 @@
+# contrib/test_group_locking/Makefile
+
+MODULE_big = test_group_locking
+OBJS = test_group_locking.o $(WIN32RES)
+PGFILEDESC = "test_group_locking - test harness for group locking"
+
+EXTENSION = test_group_locking
+DATA = test_group_locking--1.0.sql
+
+REGRESS = test_group_locking
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_group_locking
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/test_group_locking/test_group_locking--1.0.sql b/contrib/test_group_locking/test_group_locking--1.0.sql
new file mode 100644
index 0000000..adb2be5
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking--1.0.sql
@@ -0,0 +1,8 @@
+/* contrib/test_group_locking/test_group_locking--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_group_locking" to load this file. \quit
+
+CREATE FUNCTION test_group_locking(spec pg_catalog.text)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/test_group_locking/test_group_locking.c b/contrib/test_group_locking/test_group_locking.c
new file mode 100644
index 0000000..904beff
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking.c
@@ -0,0 +1,1071 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_group_locking.c
+ *		Test harness code for group locking.
+ *
+ * Copyright (C) 2013, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/test_shm_mq/test.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "commands/dbcommands.h"
+#include "fmgr.h"
+#include "lib/ilist.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "parser/scansup.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "utils/builtins.h"
+#include "utils/hsearch.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(test_group_locking);
+
+void test_group_locking_worker_main(Datum main_arg);
+
+/* Names of lock modes, for debug printouts */
+static const char *const lock_mode_names[] =
+{
+	"INVALID",
+	"AccessShareLock",
+	"RowShareLock",
+	"RowExclusiveLock",
+	"ShareUpdateExclusiveLock",
+	"ShareLock",
+	"ShareRowExclusiveLock",
+	"ExclusiveLock",
+	"AccessExclusiveLock"
+};
+
+typedef enum
+{
+	TGL_START,
+	TGL_STOP,
+	TGL_LOCK,
+	TGL_UNLOCK
+} TestGroupLockOp;
+
+typedef struct
+{
+	TestGroupLockOp op;
+	LOCKMODE lockmode;
+	Oid	relid;
+} TestGroupLockCommand;
+
+typedef struct
+{
+	dlist_node node;
+	bool verified;
+	int	group_id;
+	int task_id;
+	TestGroupLockCommand command;
+} TestGroupLockStep;
+
+typedef struct
+{
+	int group_id;
+	int task_id;
+} worker_key;
+
+typedef struct
+{
+	worker_key	key;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *handle;
+	shm_mq_handle *requesth;
+	shm_mq_handle *responseh;
+	bool awaiting_response;
+} worker_info;
+
+typedef struct
+{
+	int	group_id;
+	int	leader_task_id;
+	bool has_followers;
+} leader_info;
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct worker_fixed_data
+{
+	Oid	database_id;
+	Oid	authenticated_user_id;
+	NameData	database;
+	NameData	authenticated_user;
+	bool		use_group_locking;
+	pid_t		leader_pid;
+} worker_fixed_data;
+
+#define SHM_QUEUE_SIZE					32768
+#define TEST_GROUP_LOCKING_MAGIC		0x4c616e65
+
+static void check_for_messages(HTAB *worker_hash);
+static void determine_leader_info(dlist_head *plan, HTAB *leader_hash);
+static void handle_sigterm(SIGNAL_ARGS);
+static void process_message(HTAB *worker_hash, worker_info *info,
+				char *message, Size message_bytes);
+static void rationalize_steps(dlist_head *plan);
+static void rationalize_steps_for_task(dlist_head *plan, int group_id,
+						   int task_id);
+static void report_syntax_error(StringInfo buf);
+static bool scan_character(StringInfo buf, char c);
+static bool scan_eof(StringInfo buf);
+static char *scan_identifier(StringInfo buf);
+static bool scan_integer(StringInfo buf, int *result);
+static LOCKMODE scan_lockmode(StringInfo buf);
+static TestGroupLockOp scan_op(StringInfo buf);
+static RangeVar *scan_qualified_identifier(StringInfo buf);
+static char *scan_quoted_identifier(StringInfo buf);
+static void send_command(HTAB *worker_hash, TestGroupLockStep *step);
+static void start_worker(HTAB *worker_hash, int group_id, int task_id,
+						 int leader_task_id);
+
+/*--------------------------------------------------------------------------
+ * Main entrypoint.
+ *
+ * Start background workers and have them issue lock requests against
+ * specified relations.  We use a little mini-language to control this:
+ *
+ * N[.M]:start
+ * N[.M]:stop
+ * N[.M]:lock:lockmode:relation
+ * N[.M]:unlock:lockmode:relation
+ *
+ * N and M should be integers.  M can be omitted, in which case it defaults
+ * to 0.  Each (N, M) pair identifies a separate worker; those with the
+ * same value of N are in the same lock group.  All workers not started
+ * explicitly are started before any other actions are taken; and all
+ * workers not terminated explicitly are terminated after all other actions
+ * are taken.
+ *--------------------------------------------------------------------------
+ */
+Datum
+test_group_locking(PG_FUNCTION_ARGS)
+{
+	text *spec = PG_GETARG_TEXT_PP(0);
+	StringInfo buf = makeStringInfo();
+	dlist_head plan;
+	dlist_iter iter;
+	HASHCTL		hashctl;
+	HTAB	   *worker_hash;
+	HTAB	   *leader_hash;
+
+	appendBinaryStringInfo(buf, VARDATA_ANY(spec), VARSIZE_ANY_EXHDR(spec));
+	dlist_init(&plan);
+
+	/* Parse the user-provided specification. */
+	for (;;)
+	{
+		TestGroupLockStep *step;
+
+		step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+		if (!scan_integer(buf, &step->group_id))
+			report_syntax_error(buf);
+		if (scan_character(buf, '.') && !scan_integer(buf, &step->task_id))
+			report_syntax_error(buf);
+		if (!scan_character(buf, ':'))
+			report_syntax_error(buf);
+		step->command.op = scan_op(buf);
+
+		if (step->command.op == TGL_LOCK || step->command.op == TGL_UNLOCK)
+		{
+			RangeVar *rv;
+			if (!scan_character(buf, ':'))
+				report_syntax_error(buf);
+			step->command.lockmode = scan_lockmode(buf);
+			if (!scan_character(buf, ':'))
+				report_syntax_error(buf);
+			rv = scan_qualified_identifier(buf);
+
+			/*
+			 * Since we're trying to test locking here, don't take a lock
+			 * when locking the relation.  That's unsafe in the presence of
+			 * concurrent DDL, but since this is just test code, we don't
+			 * care.
+			 */
+			step->command.relid = RangeVarGetRelid(rv, NoLock, false);
+		}
+
+		dlist_push_tail(&plan, &step->node);
+
+		if (scan_eof(buf))
+			break;
+		if (!scan_character(buf, ','))
+			report_syntax_error(buf);
+	}
+
+	/* Make sure the series of steps looks sensible. */
+	rationalize_steps(&plan);
+
+	/* Initialize worker hash table. */
+	memset(&hashctl, 0, sizeof(HASHCTL));
+	hashctl.keysize = sizeof(worker_key);
+	hashctl.entrysize = sizeof(worker_info);
+	hashctl.hcxt = CurrentMemoryContext;
+	hashctl.hash = tag_hash;
+	worker_hash = hash_create("test_group_locking workers", 16, &hashctl,
+							  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	/* Initialize leader hash table. */
+	memset(&hashctl, 0, sizeof(HASHCTL));
+	hashctl.keysize = sizeof(int);
+	hashctl.entrysize = sizeof(leader_info);
+	hashctl.hcxt = CurrentMemoryContext;
+	hashctl.hash = tag_hash;
+	leader_hash = hash_create("test_group_locking leaders", 16, &hashctl,
+							  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+	/* Determine group leadership information. */
+	determine_leader_info(&plan, leader_hash);
+
+	/* Execute the plan. */
+	dlist_foreach(iter, &plan)
+	{
+		TestGroupLockStep   *step;
+
+		step = dlist_container(TestGroupLockStep, node, iter.cur);
+
+		if (step->command.op == TGL_START)
+		{
+			leader_info *li;
+
+			li = hash_search(leader_hash, &step->group_id, HASH_FIND, NULL);
+			Assert(li != NULL);
+			start_worker(worker_hash, step->group_id, step->task_id,
+						 li->has_followers ? li->leader_task_id : -1);
+			continue;
+		}
+
+		send_command(worker_hash, step);
+	}
+
+	PG_RETURN_VOID();
+}
+
+/* Check for messages from our workers. */
+static void
+check_for_messages(HTAB *worker_hash)
+{
+	bool progress = true;
+
+	while (progress)
+	{
+		HASH_SEQ_STATUS hash_seq;
+		worker_info *info;
+
+		progress = false;
+		hash_seq_init(&hash_seq, worker_hash);
+		while ((info = hash_seq_search(&hash_seq)) != NULL)
+		{
+			shm_mq_result result;
+			Size nbytes;
+			void *data;
+
+			result = shm_mq_receive(info->responseh, &nbytes, &data, true);
+			if (result == SHM_MQ_DETACHED)
+				ereport(ERROR,
+						(errcode(ERRCODE_CONNECTION_FAILURE),
+						 errmsg("connection to background worker %d.%d lost",
+								info->key.group_id, info->key.task_id)));
+			if (result == SHM_MQ_SUCCESS)
+			{
+				progress = true;
+				process_message(worker_hash, info, data, nbytes);
+			}
+		}
+	}
+}
+
+/* Determine leadership information for each group. */
+static void
+determine_leader_info(dlist_head *plan, HTAB *leader_hash)
+{
+	dlist_iter	iter;
+
+	dlist_foreach(iter, plan)
+	{
+		TestGroupLockStep   *step;
+		leader_info *li;
+		bool found;
+
+		step = dlist_container(TestGroupLockStep, node, iter.cur);
+		li = hash_search(leader_hash, &step->group_id, HASH_ENTER, &found);
+		if (!found)
+		{
+			li->leader_task_id = step->task_id;
+			li->has_followers = false;
+		}
+		else if (step->task_id != li->leader_task_id)
+			li->has_followers = true;
+	}
+}
+
+/* Error context callback. */
+static void
+error_callback(void *arg)
+{
+	worker_info *info = arg;
+
+	errcontext("background worker, group %d, task %d", info->key.group_id,
+			   info->key.task_id);
+}
+
+/* Handle SIGTERM. */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Make sure we have a rational series of steps, and add missing start and
+ * stop steps as needed.
+ */
+static void
+rationalize_steps(dlist_head *plan)
+{
+	bool	progress = true;
+
+	while (progress)
+	{
+		dlist_iter	iter;
+		progress = false;
+
+		dlist_foreach(iter, plan)
+		{
+			TestGroupLockStep   *step;
+
+			step = dlist_container(TestGroupLockStep, node, iter.cur);
+			if (!step->verified)
+			{
+				rationalize_steps_for_task(plan, step->group_id,
+										   step->task_id);
+				progress = true;
+				break;
+			}
+		}
+	}
+}
+
+/*
+ * Linear search through the provided list of steps.  Figure out whether any
+ * start action is unique and precedes all other actions for this task, and
+ * whether any stop action is unique and follow all other such actions.  If
+ * the steps are out of order, error; if they are missing, add them at the
+ * beginning and end as appropriate.
+ */
+static void
+rationalize_steps_for_task(dlist_head *plan, int group_id, int task_id)
+{
+	dlist_iter	iter;
+	bool		saw_start = false;
+	bool		saw_stop = false;
+	bool		saw_other = false;
+
+	dlist_foreach(iter, plan)
+	{
+		TestGroupLockStep   *step;
+
+		step = dlist_container(TestGroupLockStep, node, iter.cur);
+		if (step->group_id != group_id || step->task_id != task_id)
+			continue;
+		step->verified = true;
+
+		switch (step->command.op)
+		{
+			case TGL_START:
+				if (saw_start)
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("can't start same worker more than once")));
+				if (saw_stop || saw_other)
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("can't start worker after stopping it")));
+				if (saw_other)
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("worker can't perform actions before being started")));
+				saw_start = true;
+				break;
+			case TGL_STOP:
+				if (saw_stop)
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("can't stop same worker more than once")));
+				saw_stop = true;
+				break;
+			default:
+				if (saw_stop)
+					ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("worker can't perform actions after being stopped")));
+				saw_other = true;
+				break;
+		}
+	}
+
+	if (!saw_start)
+	{
+		TestGroupLockStep   *step;
+
+		step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+		step->group_id = group_id;
+		step->task_id = task_id;
+		step->command.op = TGL_START;
+		step->verified = true;
+		dlist_push_head(plan, &step->node);
+	}
+
+	if (!saw_stop)
+	{
+		TestGroupLockStep   *step;
+
+		step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep));
+		step->group_id = group_id;
+		step->task_id = task_id;
+		step->command.op = TGL_STOP;
+		step->verified = true;
+		dlist_push_tail(plan, &step->node);
+	}
+}
+
+/* Report a syntax error. */
+static void
+report_syntax_error(StringInfo buf)
+{
+	char badchar[MAX_MULTIBYTE_CHAR_LEN + 1];
+	int	badpos = pg_mbstrlen_with_len(buf->data, buf->cursor) + 1;
+	int badcharlen;
+
+	if (buf->cursor >= buf->len)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("unexpected end of string at position %d", badpos)));
+
+	badcharlen = pg_mblen(&buf->data[buf->cursor]);
+	memcpy(badchar, &buf->data[buf->cursor], badcharlen);
+	badchar[badcharlen] = '\0';
+
+	ereport(ERROR,
+			(errcode(ERRCODE_SYNTAX_ERROR),
+			 errmsg("unexpected character \"%s\" at position %d",
+				badchar, badpos)));
+}
+
+/* Scan a given character. */
+static bool
+scan_character(StringInfo buf, char c)
+{
+	if (buf->cursor < buf->len && buf->data[buf->cursor] == c)
+	{
+		++buf->cursor;
+		return true;
+	}
+	return false;
+}
+
+/* Scan end-of-buffer. */
+static bool
+scan_eof(StringInfo buf)
+{
+	return buf->cursor >= buf->len;
+}
+
+/* Scan and return a single-part PostgreSQL identifier. */
+static char *
+scan_identifier(StringInfo buf)
+{
+	int			start = buf->cursor;
+
+	if (buf->data[start] == '"')
+		return scan_quoted_identifier(buf);
+
+	while (buf->cursor < buf->len)
+	{
+		int		len = pg_mblen(&buf->data[buf->cursor]);
+		char	c;
+
+		/* Multibyte characters are allowed. */
+		if (len != 1)
+		{
+			Assert(len > 0);
+			buf->cursor += len;
+			continue;
+		}
+
+		/* Alphabetic characters, and underscore, are allowed. */
+		c = buf->data[buf->cursor];
+		if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_')
+		{
+			buf->cursor++;
+			continue;
+		}
+
+		/* Numeric digits, and $, are allowed but not at character 1. */
+		if (start != buf->cursor && ((c >= '0' && c <= '9') || c == '$'))
+		{
+			buf->cursor++;
+			continue;
+		}
+
+		/* Anything else is not allowed. */
+		break;
+	}
+
+	/* Return NULL if we didn't find an identifier. */
+	if (buf->cursor == start)
+		return NULL;
+
+	/* Return a copy of the identifier with appropriate case-folding. */
+	return downcase_truncate_identifier(&buf->data[start], buf->cursor - start,
+		false);
+}
+
+/* Scan an integer. */
+static bool
+scan_integer(StringInfo buf, int *result)
+{
+	int		start = buf->cursor;
+	int		val = 0;
+
+	while (buf->cursor < buf->len)
+	{
+		char c = buf->data[buf->cursor];
+
+		if (c < '0' || c > '9')
+			break;
+		val = val * 10 + (c - '0');
+		++buf->cursor;
+	}
+
+	if (buf->cursor == start)
+		return false;
+	*result = val;
+	return true;
+}
+
+/* Scan and return a lock mode. */
+static LOCKMODE
+scan_lockmode(StringInfo buf)
+{
+	char *mode = scan_identifier(buf);
+
+	if (mode == NULL)
+		report_syntax_error(buf);
+
+	if (pg_strcasecmp(mode, "AccessShareLock") == 0)
+		return AccessShareLock;
+	else if (pg_strcasecmp(mode, "RowShareLock") == 0)
+		return RowShareLock;
+	else if (pg_strcasecmp(mode, "RowExclusiveLock") == 0)
+		return RowExclusiveLock;
+	else if (pg_strcasecmp(mode, "ShareUpdateExclusiveLock") == 0)
+		return ShareUpdateExclusiveLock;
+	else if (pg_strcasecmp(mode, "ShareLock") == 0)
+		return ShareLock;
+	else if (pg_strcasecmp(mode, "ShareRowExclusiveLock") == 0)
+		return ShareRowExclusiveLock;
+	else if (pg_strcasecmp(mode, "ExclusiveLock") == 0)
+		return ExclusiveLock;
+	else if (pg_strcasecmp(mode, "AccessExclusiveLock") == 0)
+		return AccessExclusiveLock;
+
+	ereport(ERROR,
+			(errcode(ERRCODE_SYNTAX_ERROR),
+			 errmsg("invalid lock mode: \"%s\"", mode)));
+}
+
+/* Scan and return an operation name. */
+static TestGroupLockOp
+scan_op(StringInfo buf)
+{
+	char *opname = scan_identifier(buf);
+
+	if (opname == NULL)
+		report_syntax_error(buf);
+
+	if (pg_strcasecmp(opname, "start") == 0)
+		return TGL_START;
+	else if (pg_strcasecmp(opname, "stop") == 0)
+		return TGL_STOP;
+	else if (pg_strcasecmp(opname, "lock") == 0)
+		return TGL_LOCK;
+	else if (pg_strcasecmp(opname, "unlock") == 0)
+		return TGL_UNLOCK;
+
+	ereport(ERROR,
+			(errcode(ERRCODE_SYNTAX_ERROR),
+			 errmsg("invalid operation name: \"%s\"", opname)));
+}
+
+/* Scan and return a possibly schema-qualified identifier. */
+static RangeVar *
+scan_qualified_identifier(StringInfo buf)
+{
+	char *name1;
+	char *name2;
+
+	name1 = scan_identifier(buf);
+	if (name1 == NULL)
+		report_syntax_error(buf);
+	if (buf->data[buf->cursor] != '.')
+		return makeRangeVar(NULL, name1, -1);
+	buf->cursor++;
+	name2 = scan_identifier(buf);
+	if (name2 == NULL)
+		report_syntax_error(buf);
+	return makeRangeVar(name1, name2, -1);
+}
+
+/* Scan and return a quoted single-part identifier. */
+static char *
+scan_quoted_identifier(StringInfo buf)
+{
+	StringInfoData result;
+
+	initStringInfo(&result);
+
+	if (buf->data[buf->cursor] != '"')
+		return NULL;
+
+	while (++buf->cursor < buf->len)
+	{
+		char   *s;
+
+		/* If we see a byte that is not a quote, append to result. */
+		s = &buf->data[buf->cursor];
+		if (s[0] != '"')
+		{
+			appendStringInfoChar(&result, s[0]);
+			continue;
+		}
+
+		/* If we see a byte that is a quote, check for a following quote. */
+		if (++buf->cursor < buf->len && s[1] == '"')
+		{
+			appendStringInfoChar(&result, s[0]);
+			continue;
+		}
+
+		/* We've found the terminating quote, so stop here. */
+		return result.data;
+	}
+
+	/* We ran off the end of the buffer with no close-quote.  Oops. */
+	return NULL;
+}
+
+/* Process a message from a background worker. */
+static void
+process_message(HTAB *worker_hash, worker_info *info, char *message,
+				Size message_bytes)
+{
+	StringInfoData msg;
+	char msgtype;
+	const char *tag;
+
+	initStringInfo(&msg);
+	enlargeStringInfo(&msg, message_bytes);
+	appendBinaryStringInfo(&msg, message, message_bytes);
+	msgtype = pq_getmsgbyte(&msg);
+
+	if (msgtype == 'E' || msgtype == 'N')
+	{
+		ErrorData	edata;
+		ErrorContextCallback context;
+
+		pq_parse_errornotice(&msg, &edata);
+		edata.elevel = Min(edata.elevel, ERROR);
+		context.callback = error_callback;
+		context.arg = info;
+		context.previous = error_context_stack;
+		error_context_stack = &context;
+		ThrowErrorData(&edata);
+		error_context_stack = context.previous;
+		return;
+	}
+
+	/* Not error or notice, so must be command complete. */
+	if (msgtype != 'C')
+		elog(ERROR, "unknown message type: %c (%zu bytes)",
+			 msg.data[0], message_bytes);
+	tag = pq_getmsgstring(&msg);
+
+	/* Hopefully we were waiting for a response... */
+	if (!info->awaiting_response)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unexpected acknowledgement from worker %d.%d: \"%s\"",
+					info->key.group_id, info->key.task_id, tag)));
+	info->awaiting_response = false;
+
+	/* If the client indicates that it will stop, detach from it. */
+	if (strcmp(tag, "STOP") == 0)
+	{
+		dsm_detach(info->seg);
+		hash_search(worker_hash, &info->key, HASH_REMOVE, NULL);
+	}
+}
+
+/* Send a command to a background worker. */
+static void
+send_command(HTAB *worker_hash, TestGroupLockStep *step)
+{
+	worker_key key;
+	bool	found;
+	worker_info *info;
+	shm_mq_result	result;
+
+	key.group_id = step->group_id;
+	key.task_id = step->task_id;
+	info = hash_search(worker_hash, &key, HASH_FIND, &found);
+	Assert(found);
+
+	/* Display progress report. */
+	switch (step->command.op)
+	{
+		case TGL_STOP:
+			ereport(NOTICE,
+					(errmsg("stopping worker %d.%d",
+							step->group_id, step->task_id)));
+			break;
+
+		case TGL_LOCK:
+			ereport(NOTICE,
+					(errmsg("instructing worker %d.%d to acquire %s on relation with OID %u",
+							step->group_id, step->task_id,
+							lock_mode_names[step->command.lockmode],
+							step->command.relid)));
+			break;
+
+		case TGL_UNLOCK:
+			ereport(NOTICE,
+					(errmsg("instructing worker %d.%d to release %s on relation with OID %u",
+							step->group_id, step->task_id,
+							lock_mode_names[step->command.lockmode],
+							step->command.relid)));
+			break;
+		default:
+			elog(ERROR, "bad operation code: %d", (int) step->command.op);
+	}
+
+	/* Transmit command to worker. */
+	result = shm_mq_send(info->requesth, sizeof(TestGroupLockCommand),
+						 &step->command, false);
+	if (result != SHM_MQ_SUCCESS)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("connection to background worker lost")));
+	info->awaiting_response = true;
+	for (;;)
+	{
+		check_for_messages(worker_hash);
+		if (!info->awaiting_response)
+			break;
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+		CHECK_FOR_INTERRUPTS();
+		ResetLatch(&MyProc->procLatch);
+	}
+
+}
+
+/* Start a background worker. */
+static void
+start_worker(HTAB *worker_hash, int group_id, int task_id, int leader_task_id)
+{
+	worker_key	key;
+	worker_info *info;
+	bool		found;
+	shm_toc_estimator	e;
+	Size		segsize;
+	shm_toc *toc;
+	worker_fixed_data *fdata;
+	shm_mq *requestq;
+	shm_mq *responseq;
+	BackgroundWorker worker;
+
+	/* Set up entry in hash table. */
+	key.group_id = group_id;
+	key.task_id = task_id;
+	info = hash_search(worker_hash, &key, HASH_ENTER, &found);
+	Assert(!found);
+	memset(((char *) info) + sizeof(worker_key), 0,
+		   sizeof(worker_info) - sizeof(worker_key));
+
+	/* Log a message explaining what we're going to do. */
+	if (leader_task_id < 0)
+		ereport(NOTICE,
+				(errmsg("starting worker %d.%d", group_id, task_id)));
+	else if (task_id == leader_task_id)
+		ereport(NOTICE,
+				(errmsg("starting worker %d.%d as group leader",
+						group_id, task_id)));
+	else
+		ereport(NOTICE,
+				(errmsg("starting worker %d.%d with group leader %d.%d",
+						group_id, task_id, group_id, leader_task_id)));
+
+	/* Create dynamic shared memory segment and table of contents. */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(worker_fixed_data));
+	shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE);
+	shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE);
+	shm_toc_estimate_keys(&e, 3);
+	segsize = shm_toc_estimate(&e);
+	info->seg = dsm_create(segsize);
+	toc = shm_toc_create(TEST_GROUP_LOCKING_MAGIC,
+						 dsm_segment_address(info->seg), segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(worker_fixed_data));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	namestrcpy(&fdata->database, get_database_name(MyDatabaseId));
+	namestrcpy(&fdata->authenticated_user,
+			   GetUserNameFromId(fdata->authenticated_user_id));
+	shm_toc_insert(toc, 0, fdata);
+	if (leader_task_id >= 0)
+	{
+		fdata->use_group_locking = true;
+		if (task_id == leader_task_id)
+			fdata->leader_pid = 0;
+		else 
+		{
+			worker_key lkey;
+			worker_info *leader_info;
+
+			lkey.group_id = group_id;
+			lkey.task_id = leader_task_id;
+			leader_info = hash_search(worker_hash, &lkey, HASH_ENTER, &found);
+			Assert(found);
+			if (GetBackgroundWorkerPid(leader_info->handle, &fdata->leader_pid)
+				!= BGWH_STARTED)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("could not determine PID of leader %d.%d",
+								group_id, leader_task_id)));
+		}
+	}
+
+	/* Establish message queues in dynamic shared memory. */
+	requestq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE),
+							 SHM_QUEUE_SIZE);
+	shm_toc_insert(toc, 1, requestq);
+	shm_mq_set_sender(requestq, MyProc);
+	info->requesth = shm_mq_attach(requestq, info->seg, NULL);
+	responseq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE),
+							  SHM_QUEUE_SIZE);
+	shm_toc_insert(toc, 2, responseq);
+	shm_mq_set_receiver(responseq, MyProc);
+	info->responseh = shm_mq_attach(responseq, info->seg, NULL);
+
+	/* Configure a worker. */
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = NULL;       /* new worker might not have library loaded */
+	sprintf(worker.bgw_library_name, "test_group_locking");
+	sprintf(worker.bgw_function_name, "test_group_locking_worker_main");
+	snprintf(worker.bgw_name, BGW_MAXLEN,
+		"test_group_locking %d/%d by PID %d", group_id, task_id, MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(info->seg));
+	/* set bgw_notify_pid, so we can detect if the worker stops */
+	worker.bgw_notify_pid = MyProcPid;
+
+	/* Register the worker. */
+	if (!RegisterDynamicBackgroundWorker(&worker, &info->handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+				 errhint("You may need to increase max_worker_processes.")));
+	shm_mq_set_handle(info->requesth, info->handle);
+	shm_mq_set_handle(info->responseh, info->handle);
+
+	/* Wait for the worker to come online. */
+	info->awaiting_response = true;
+	for (;;)
+	{
+		check_for_messages(worker_hash);
+		if (!info->awaiting_response)
+			break;
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+		CHECK_FOR_INTERRUPTS();
+		ResetLatch(&MyProc->procLatch);
+	}
+}
+
+/* Background worker entrypoint. */
+void
+test_group_locking_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc *toc;
+	shm_mq *requestq;
+	shm_mq *responseq;
+	shm_mq_handle *requesth;
+	shm_mq_handle *responseh;
+	worker_fixed_data *fdata;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_group_locking");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "test_group_locking",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	/* Connect to the dynamic shared memory segment. */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(TEST_GROUP_LOCKING_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find shared memory queues and attach to them. */
+	requestq = shm_toc_lookup(toc, 1);
+	shm_mq_set_receiver(requestq, MyProc);
+	requesth = shm_mq_attach(requestq, seg, NULL);
+	responseq = shm_toc_lookup(toc, 2);
+	shm_mq_set_sender(responseq, MyProc);
+	responseh = shm_mq_attach(responseq, seg, NULL);
+	pq_redirect_to_shm_mq(responseq, responseh);
+
+	/* Connect to database. */
+	fdata = shm_toc_lookup(toc, 0);
+	BackgroundWorkerInitializeConnection(NameStr(fdata->database),
+										 NameStr(fdata->authenticated_user));
+	if (fdata->database_id != MyDatabaseId ||
+		fdata->authenticated_user_id != GetAuthenticatedUserId())
+		ereport(ERROR,
+			(errmsg("user or database renamed during worker startup")));
+
+	/* Activate group locking, if appropriate. */
+	if (fdata->use_group_locking)
+	{
+		if (fdata->leader_pid == 0)
+			BecomeLockGroupLeader();
+		else
+		{
+			PGPROC *proc;
+
+			/*
+			 * This is a cheesy hack that I'm going with for the sake of
+			 * getting this test code running.  Don't really do it this way!
+			 *
+			 * In a real parallel computation, all of the workers in a lock
+			 * group would be started by the same process, which should pass
+			 * its own value of MyProc and its pid to those followers.  That
+			 * way, if the leader exits before the children are up and running,
+			 * they'll fail to join the lock group unless (a) the same PID
+			 * is again running and (b) it is a PostgreSQL process and (c) it
+			 * it using the same PGPROC as before and (d) it is again a lock
+			 * group leader.  Looking up the proc using the PID, as we're doing
+			 * here, loses the third of those guarantees - which is not a
+			 * catastrophe, but best avoided.
+			 */
+
+			proc = BackendPidGetProc(fdata->leader_pid);
+			if (proc == NULL
+				|| !BecomeLockGroupFollower(proc, fdata->leader_pid))
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("could not join lock group for leader PID %d",
+								fdata->leader_pid)));
+		}
+	}
+
+	/* Inform the worker who started us that we're up and running. */
+	pq_putmessage('C', "START", 6);
+
+	/* Begin a transaction. */
+	StartTransactionCommand();
+
+	/* Main loop: read and process messages. */
+	for (;;)
+	{
+		Size		nbytes;
+		void	   *data;
+		shm_mq_result	result;
+		TestGroupLockCommand *command;
+
+		result = shm_mq_receive(requesth, &nbytes, &data, false);
+		if (result != SHM_MQ_SUCCESS)
+			ereport(FATAL,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("connection to user backend lost")));
+		if (nbytes != sizeof(TestGroupLockCommand))
+			ereport(FATAL,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid command message from user backend")));
+		command = data;
+
+		switch (command->op)
+		{
+			case TGL_STOP:
+				CommitTransactionCommand();
+				pq_putmessage('C', "STOP", 5);
+				exit(0);
+				break;
+
+			case TGL_LOCK:
+				if (!ConditionalLockRelationOid(command->relid,
+												command->lockmode))
+					ereport(ERROR,
+							(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
+						 errmsg("could not obtain %s on relation with OID %u",
+								 lock_mode_names[command->lockmode],
+								 command->relid)));
+				pq_putmessage('C', "LOCK", 5);
+				break;
+
+			case TGL_UNLOCK:
+				UnlockRelationOid(command->relid, command->lockmode);
+				pq_putmessage('C', "UNLOCK", 7);
+				break;
+
+			default:
+				elog(ERROR, "unknown operation: %d", (int) command->op);
+		}
+	}		
+}
diff --git a/contrib/test_group_locking/test_group_locking.control b/contrib/test_group_locking/test_group_locking.control
new file mode 100644
index 0000000..3b69359
--- /dev/null
+++ b/contrib/test_group_locking/test_group_locking.control
@@ -0,0 +1,4 @@
+comment = 'Test code for group locking'
+default_version = '1.0'
+module_pathname = '$libdir/test_group_locking'
+relocatable = true
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to