Attached is a contrib module that lets you launch arbitrary command in
a background worker, and supporting infrastructure patches for core.
You can launch queries and fetch the results back, much as you could
do with a dblink connection back to the local database but without the
hassles of dealing with authentication; and you can also run utility
commands, like VACUUM.  For people who have always wanted to be able
to launch a vacuum (or an autonomous transaction, or a background
task) from a procedural language ... enjoy.

Here's an example of running vacuum and then fetching the results.
Notice that the notices from the original session are propagated to
our session; if an error had occurred, it would be re-thrown locally
when we try to read the results.

rhaas=# create table foo (a int);
CREATE TABLE
rhaas=# select pg_background_launch('vacuum verbose foo');
 pg_background_launch
----------------------
                51101
(1 row)

rhaas=# select * from pg_background_result(51101) as (x text);
INFO:  vacuuming "public.foo"
INFO:  "foo": found 0 removable, 0 nonremovable row versions in 0 out of 0 pages
DETAIL:  0 dead row versions cannot be removed yet.
There were 0 unused item pointers.
0 pages are entirely empty.
CPU 0.00s/0.00u sec elapsed 0.00 sec.
   x
--------
 VACUUM
(1 row)

Here's an overview of the attached patches:

Patches 1 and 2 add a few new interfaces to the shm_mq and dsm APIs
that happen to be convenient for later patches in the series.  I'm
pretty sure I could make all this work without these, but it would
take more code and be less efficient, so here they are.

Patch 3 adds the ability for a backend to request that the protocol
messages it would normally send to the frontend get redirected to a
shm_mq.  I did this by adding a couple of hook functions.  The best
design is definitely arguable here, so if you'd like to bikeshed, this
is probably the patch to look at.  This patch also adds a function to
help you parse an ErrorResponse or NoticeResponse and re-throw the
error or notice in the originating backend.  Obviously, parallelism is
going to need this kind of functionality, but I suspect a variety of
other applications people may develop using background workers may
want it too; and it's certainly important for pg_background itself.

Patch 4 adds infrastructure that allows one session to save all of its
non-default GUC values and another session to reload those values.
This was written by Amit Khandekar and Noah Misch.  It allows
pg_background to start up the background worker with the same GUC
settings that the launching process is using.  I intend this as a
demonstration of how to synchronize any given piece of state between
cooperating backends.  For real parallelism, we'll need to synchronize
snapshots, combo CIDs, transaction state, and so on, in addition to
GUCs.  But GUCs are ONE of the things that we'll need to synchronize
in that context, and this patch shows the kind of API we're thinking
about for these sorts of problems.

Patch 5 is a trivial patch to add a function to get the authenticated
user ID.  Noah pointed out to me that it's important for the
authenticated user ID, session user ID, and current user ID to all
match between the original session and the background worker.
Otherwise, pg_background could be used to circumvent restrictions that
we normally impose when those values differ from each other.  The
session and current user IDs are restored by the GUC save-and-restore
machinery ("session_authorization" and "role") but the authenticated
user ID requires special treatment.  To make that happen, it has to be
exposed somehow.

Patch 6 is pg_background itself.  I'm quite pleased with how easily
this came together.  The existing background worker, dsm, shm_toc, and
shm_mq infrastructure handles most of the heavily lifting here -
obviously with some exceptions addressed by the preceding patches.
Again, this is the kind of set-up that I'm expecting will happen in a
background worker used for actual parallelism - clearly, more state
will need to be restored there than here, but nonetheless the general
flow of the code here is about what I'm imagining, just with somewhat
more different kinds of state.  Most of the work of writing this patch
was actually figuring out how to execute the query itself; what I
ended up with is mostly copied form exec_simple_query, but with some
difference here and there.  I'm not sure if it would be
possible/advisable to try to refactor to reduce duplication.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 82c93205d2dba8cf82c2753dc0b90540810f0a57 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Thu, 17 Jul 2014 14:51:27 -0400
Subject: [PATCH 1/6] Extend shm_mq API with new functions shm_mq_sendv and
 shm_mq_set_handle.

shm_mq_sendv sends a message to the queue assembled from multiple
locations.  This is expected to be used by forthcoming patches to
allow frontend/backend protocol messages to be sent via shm_mq, but
might be useful for other purposes as well.

shm_mq_set_handle associates a BackgroundWorkerHandle with an
already-existing shm_mq_handle.  This solves a timing problem when
creating a shm_mq to communicate with a newly-launched background
worker: if you attach to the queue first, and the background worker
fails to start, you might block forever trying to do I/O on the
queue; but if you start the background worker first, but then die
before attaching to the queue, the background worrker might block
forever trying to do I/O on the queue.  This lets you attach before
starting the worker (so that the worker is protected) and then associate
the BackgroundWorkerHandle later (so that you are also protected).
---
 src/backend/storage/ipc/shm_mq.c |  126 +++++++++++++++++++++++++++++++++-----
 src/include/storage/shm_mq.h     |   14 ++++-
 2 files changed, 124 insertions(+), 16 deletions(-)

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index d96627a..90df593 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -139,7 +139,7 @@ struct shm_mq_handle
 };
 
 static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
-				  void *data, bool nowait, Size *bytes_written);
+				  const void *data, bool nowait, Size *bytes_written);
 static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
 					 bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile * ptr,
@@ -301,7 +301,33 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 }
 
 /*
+ * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
+ * been passed to shm_mq_attach.
+ */
+void
+shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
+{
+	Assert(mqh->mqh_handle == NULL);
+	mqh->mqh_handle = handle;
+}
+
+/*
  * Write a message into a shared message queue.
+ */
+shm_mq_result
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+{
+	shm_mq_iovec	iov;
+
+	iov.data = data;
+	iov.len = nbytes;
+
+	return shm_mq_sendv(mqh, &iov, 1, nowait);
+}
+
+/*
+ * Write a message into a shared message queue, gathered from multiple
+ * addresses.
  *
  * When nowait = false, we'll wait on our process latch when the ring buffer
  * fills up, and then continue writing once the receiver has drained some data.
@@ -315,14 +341,22 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 {
 	shm_mq_result res;
 	shm_mq	   *mq = mqh->mqh_queue;
+	Size		nbytes = 0;
 	Size		bytes_written;
+	int			i;
+	int			which_iov = 0;
+	Size		offset;
 
 	Assert(mq->mq_sender == MyProc);
 
+	/* Compute total size of write. */
+	for (i = 0; i < iovcnt; ++i)
+		nbytes += iov[i].len;
+
 	/* Try to write, or finish writing, the length word into the buffer. */
 	while (!mqh->mqh_length_word_complete)
 	{
@@ -348,18 +382,80 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 
 	/* Write the actual data bytes into the buffer. */
 	Assert(mqh->mqh_partial_bytes <= nbytes);
-	res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
-							((char *) data) + mqh->mqh_partial_bytes,
-							nowait, &bytes_written);
-	if (res == SHM_MQ_WOULD_BLOCK)
-		mqh->mqh_partial_bytes += bytes_written;
-	else
+	offset = mqh->mqh_partial_bytes;
+	do
 	{
-		mqh->mqh_partial_bytes = 0;
-		mqh->mqh_length_word_complete = false;
-	}
-	if (res != SHM_MQ_SUCCESS)
-		return res;
+		Size	chunksize;
+
+		/* Figure out which bytes need to be sent next. */
+		if (offset >= iov[which_iov].len)
+		{
+			offset -= iov[which_iov].len;
+			++which_iov;
+			if (which_iov >= iovcnt)
+				break;
+			continue;
+		}
+
+		/*
+		 * We want to avoid copying the data if at all possible, but every
+		 * chunk of bytes we write into the queue has to be MAXALIGN'd,
+		 * except the last.  Thus, if a chunk other than the last one ends
+		 * on a non-MAXALIGN'd boundary, we have to combine the tail end of
+		 * its data with data from one or more following chunks until we
+		 * either reach the last chunk or accumulate a number of bytes which
+		 * is MAXALIGN'd.
+		 */
+		if (which_iov + 1 < iovcnt &&
+			offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
+		{
+			char	tmpbuf[MAXIMUM_ALIGNOF];
+			int		j = 0;
+
+			for (;;)
+			{
+				if (offset < iov[which_iov].len)
+				{
+					tmpbuf[j] = iov[which_iov].data[offset];
+					j++;
+					offset++;
+					if (j == MAXIMUM_ALIGNOF)
+						break;
+				}
+				else
+				{
+					offset -= iov[which_iov].len;
+					which_iov++;
+					if (which_iov >= iovcnt)
+						break;
+				}
+			}
+			res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
+			mqh->mqh_partial_bytes += bytes_written;
+			if (res != SHM_MQ_SUCCESS)
+				return res;
+			continue;
+		}
+
+		/*
+		 * If this is the last chunk, we can write all the data, even if it
+		 * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
+		 * MAXALIGN_DOWN the write size.
+		 */
+		chunksize = iov[which_iov].len - offset;
+		if (which_iov + 1 < iovcnt)
+			chunksize = MAXALIGN_DOWN(chunksize);
+		res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
+								nowait, &bytes_written);
+		mqh->mqh_partial_bytes += bytes_written;
+		offset += bytes_written;
+		if (res != SHM_MQ_SUCCESS)
+			return res;
+	} while (mqh->mqh_partial_bytes < nbytes);
+
+	/* Reset for next message. */
+	mqh->mqh_partial_bytes = 0;
+	mqh->mqh_length_word_complete = false;
 
 	/* Notify receiver of the newly-written data, and return. */
 	return shm_mq_notify_receiver(mq);
@@ -653,8 +749,8 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
-				  Size *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
+				  bool nowait, Size *bytes_written)
 {
 	shm_mq	   *mq = mqh->mqh_queue;
 	Size		sent = 0;
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 5bae380..063400a 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -25,6 +25,13 @@ typedef struct shm_mq shm_mq;
 struct shm_mq_handle;
 typedef struct shm_mq_handle shm_mq_handle;
 
+/* Descriptors for a single write spanning multiple locations. */
+typedef struct
+{
+	const char  *data;
+	Size	len;
+} shm_mq_iovec;
+
 /* Possible results of a send or receive operation. */
 typedef enum
 {
@@ -52,12 +59,17 @@ extern PGPROC *shm_mq_get_sender(shm_mq *);
 extern shm_mq_handle *shm_mq_attach(shm_mq *mq, dsm_segment *seg,
 			  BackgroundWorkerHandle *handle);
 
+/* Associate worker handle with shm_mq. */
+extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
+
 /* Break connection. */
 extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-			Size nbytes, void *data, bool nowait);
+			Size nbytes, const void *data, bool nowait);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
+			shm_mq_iovec *iov, int iovcnt, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 			   Size *nbytesp, void **datap, bool nowait);
 
-- 
1.7.9.6 (Apple Git-31.1)

From d982dbb620b6a5d9992b3259550c3e16e7e8e28a Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Wed, 23 Jul 2014 11:16:44 -0400
Subject: [PATCH 2/6] Extend dsm API with a new function dsm_unkeep_mapping.

This reassociates a dynamic shared memory handle previous passed to
dsm_keep_mapping with the current resource owner, so that it will be
cleaned up at the end of the current query.
---
 src/backend/storage/ipc/dsm.c |   18 ++++++++++++++++++
 src/include/storage/dsm.h     |    1 +
 2 files changed, 19 insertions(+)

diff --git a/src/backend/storage/ipc/dsm.c b/src/backend/storage/ipc/dsm.c
index a5c0084..6039beb 100644
--- a/src/backend/storage/ipc/dsm.c
+++ b/src/backend/storage/ipc/dsm.c
@@ -796,6 +796,24 @@ dsm_keep_mapping(dsm_segment *seg)
 }
 
 /*
+ * Arrange to remove a dynamic shared memory mapping at cleanup time.
+ *
+ * dsm_keep_mapping() can be used to preserve a mapping for the entire
+ * lifetime of a process; this function reverses that decision, making
+ * the segment owned by the current resource owner.  This may be useful
+ * just before performing some operation that will invalidate the segment
+ * for future use by this backend.
+ */
+void
+dsm_unkeep_mapping(dsm_segment *seg)
+{
+	Assert(seg->resowner == NULL);
+	ResourceOwnerEnlargeDSMs(CurrentResourceOwner);
+	seg->resowner = CurrentResourceOwner;
+	ResourceOwnerRememberDSM(seg->resowner, seg);
+}
+
+/*
  * Keep a dynamic shared memory segment until postmaster shutdown.
  *
  * This function should not be called more than once per segment;
diff --git a/src/include/storage/dsm.h b/src/include/storage/dsm.h
index 1d0110d..1694409 100644
--- a/src/include/storage/dsm.h
+++ b/src/include/storage/dsm.h
@@ -37,6 +37,7 @@ extern void dsm_detach(dsm_segment *seg);
 
 /* Resource management functions. */
 extern void dsm_keep_mapping(dsm_segment *seg);
+extern void dsm_unkeep_mapping(dsm_segment *seg);
 extern void dsm_keep_segment(dsm_segment *seg);
 extern dsm_segment *dsm_find_mapping(dsm_handle h);
 
-- 
1.7.9.6 (Apple Git-31.1)

From 5840a0f2c7a7546c98485382e2c173b6be943f54 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Wed, 28 May 2014 19:30:21 -0400
Subject: [PATCH 3/6] Support frontend-backend protocol communication using a
 shm_mq.

A background worker can use pq_redirect_to_shm_mq() to direct protocol
that would normally be sent to the frontend to a shm_mq so that another
process may read them.

The receiving process may use pq_parse_errornotice() to parse an
ErrorResponse or NoticeResponse from the background worker and, if
it wishes, ThrowErrorData() to propagate the error (with or without
further modification).
---
 src/backend/libpq/Makefile       |    2 +-
 src/backend/libpq/pqcomm.c       |   13 +++
 src/backend/libpq/pqmq.c         |  202 ++++++++++++++++++++++++++++++++++++++
 src/backend/utils/adt/numutils.c |    2 +-
 src/backend/utils/error/elog.c   |   51 ++++++++++
 src/include/libpq/libpq.h        |    3 +
 src/include/libpq/pqmq.h         |   22 +++++
 src/include/utils/builtins.h     |    2 +-
 src/include/utils/elog.h         |    1 +
 9 files changed, 295 insertions(+), 3 deletions(-)
 create mode 100644 src/backend/libpq/pqmq.c
 create mode 100644 src/include/libpq/pqmq.h

diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile
index e929864..f548b2f 100644
--- a/src/backend/libpq/Makefile
+++ b/src/backend/libpq/Makefile
@@ -15,6 +15,6 @@ include $(top_builddir)/src/Makefile.global
 # be-fsstubs is here for historical reasons, probably belongs elsewhere
 
 OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
-       pqformat.o pqsignal.o
+       pqformat.o pqmq.o pqsignal.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 605d891..b02fb0e 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -102,6 +102,9 @@
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
+/* Hooks for protocol message redirect */
+int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+int	(*pq_flush_hook)(void);
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -792,6 +795,11 @@ TouchSocketFiles(void)
 static void
 pq_set_nonblocking(bool nonblocking)
 {
+	if (MyProcPort == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("there is no client connection")));
+
 	if (MyProcPort->noblock == nonblocking)
 		return;
 
@@ -1220,6 +1228,9 @@ pq_flush(void)
 {
 	int			res;
 
+	if (pq_flush_hook != NULL)
+		return pq_flush_hook();
+
 	/* No-op if reentrant call */
 	if (PqCommBusy)
 		return 0;
@@ -1378,6 +1389,8 @@ pq_is_send_pending(void)
 int
 pq_putmessage(char msgtype, const char *s, size_t len)
 {
+	if (pq_putmessage_hook != NULL)
+		return pq_putmessage_hook(msgtype, s, len);
 	if (DoingCopyOut || PqCommBusy)
 		return 0;
 	PqCommBusy = true;
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
new file mode 100644
index 0000000..c29cc7c
--- /dev/null
+++ b/src/backend/libpq/pqmq.c
@@ -0,0 +1,202 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static int pq_putmessage_mq(char msgtype, const char *s, size_t len);
+static int pq_flush_mq(void);
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+	pq_putmessage_hook = pq_putmessage_mq;
+	pq_flush_hook = pq_flush_mq;
+	pq_mq = mq;
+	pq_mq_handle = mqh;
+	whereToSendOutput = DestRemote;
+	FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle.  We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+pq_putmessage_mq(char msgtype, const char *s, size_t len)
+{
+	shm_mq_iovec	iov[2];
+	shm_mq_result	result;
+
+	/*
+	 * If we're sending a message, and we have to wait because the
+	 * queue is full, and then we get interrupted, and that interrupt
+	 * results in trying to send another message, we respond by detaching
+	 * the queue.  There's no way to return to the original context, but
+	 * even if there were, just queueing the message would amount to
+	 * indefinitely postponing the response to the interrupt.  So we do
+	 * this instead.
+	 */
+	if (pq_mq_busy)
+	{
+		if (pq_mq != NULL)
+			shm_mq_detach(pq_mq);
+		pq_mq = NULL;
+		return EOF;
+	}
+
+	pq_mq_busy = true;
+
+	iov[0].data = &msgtype;
+	iov[0].len = 1;
+	iov[1].data = s;
+	iov[1].len = len;
+
+	Assert(pq_mq_handle != NULL);
+	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	pq_mq_busy = false;
+
+	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+	if (result != SHM_MQ_SUCCESS)
+		return EOF;
+	return 0;
+}
+
+/*
+ * Flush is not required.
+ */
+static int
+pq_flush_mq(void)
+{
+	return 0;
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+	/* Initialize edata with reasonable defaults. */
+	MemSet(edata, 0, sizeof(ErrorData));
+	edata->elevel = ERROR;
+	edata->assoc_context = CurrentMemoryContext;
+
+	/* Loop over fields and extract each one. */
+	for (;;)
+	{
+		char	code = pq_getmsgbyte(msg);
+		const char *value;
+
+		if (code == '\0')
+		{
+			pq_getmsgend(msg);
+			break;
+		}
+	 	value = pq_getmsgstring(msg);
+
+		switch (code)
+		{
+			case PG_DIAG_SEVERITY:
+				if (strcmp(value, "DEBUG") == 0)
+					edata->elevel = DEBUG1;	/* or some other DEBUG level */
+				else if (strcmp(value, "LOG") == 0)
+					edata->elevel = LOG;	/* can't be COMMERROR */
+				else if (strcmp(value, "INFO") == 0)
+					edata->elevel = INFO;
+				else if (strcmp(value, "NOTICE") == 0)
+					edata->elevel = NOTICE;
+				else if (strcmp(value, "WARNING") == 0)
+					edata->elevel = WARNING;
+				else if (strcmp(value, "ERROR") == 0)
+					edata->elevel = ERROR;
+				else if (strcmp(value, "FATAL") == 0)
+					edata->elevel = FATAL;
+				else if (strcmp(value, "PANIC") == 0)
+					edata->elevel = PANIC;
+				else
+					elog(ERROR, "unknown error severity");
+				break;
+			case PG_DIAG_SQLSTATE:
+				if (strlen(value) != 5)
+					elog(ERROR, "malformed sql state");
+				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+												  value[3], value[4]);
+				break;
+			case PG_DIAG_MESSAGE_PRIMARY:
+				edata->message = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_DETAIL:
+				edata->detail = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_HINT:
+				edata->hint = pstrdup(value);
+				break;
+			case PG_DIAG_STATEMENT_POSITION:
+				edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_POSITION:
+				edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_QUERY:
+				edata->internalquery = pstrdup(value);
+				break;
+			case PG_DIAG_CONTEXT:
+				edata->context = pstrdup(value);
+				break;
+			case PG_DIAG_SCHEMA_NAME:
+				edata->schema_name = pstrdup(value);
+				break;
+			case PG_DIAG_TABLE_NAME:
+				edata->table_name = pstrdup(value);
+				break;
+			case PG_DIAG_COLUMN_NAME:
+				edata->column_name = pstrdup(value);
+				break;
+			case PG_DIAG_DATATYPE_NAME:
+				edata->datatype_name = pstrdup(value);
+				break;
+			case PG_DIAG_CONSTRAINT_NAME:
+				edata->constraint_name = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_FILE:
+				edata->filename = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_LINE:
+				edata->lineno = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_SOURCE_FUNCTION:
+				edata->funcname = pstrdup(value);
+				break;
+			default:
+				elog(ERROR, "unknown error field: %d", (int) code);
+				break;
+		}
+	}
+}
diff --git a/src/backend/utils/adt/numutils.c b/src/backend/utils/adt/numutils.c
index ca5a8a5..1d13363 100644
--- a/src/backend/utils/adt/numutils.c
+++ b/src/backend/utils/adt/numutils.c
@@ -34,7 +34,7 @@
  * overflow.
  */
 int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
 {
 	long		l;
 	char	   *badp;
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 32a9663..2316464 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1577,6 +1577,57 @@ FlushErrorState(void)
 }
 
 /*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+	ErrorData *newedata;
+	MemoryContext	oldcontext;
+
+	if (!errstart(edata->elevel, edata->filename, edata->lineno,
+				  edata->funcname, NULL))
+		return;
+
+	newedata = &errordata[errordata_stack_depth];
+	oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+	/* Copy the supplied fields to the error stack. */
+	if (edata->sqlerrcode > 0)
+		newedata->sqlerrcode = edata->sqlerrcode;
+	if (edata->message)
+		newedata->message = pstrdup(edata->message);
+	if (edata->detail)
+		newedata->detail = pstrdup(edata->detail);
+	if (edata->detail_log)
+		newedata->detail_log = pstrdup(edata->detail_log);
+	if (edata->hint)
+		newedata->hint = pstrdup(edata->hint);
+	if (edata->context)
+		newedata->context = pstrdup(edata->context);
+	if (edata->schema_name)
+		newedata->schema_name = pstrdup(edata->schema_name);
+	if (edata->table_name)
+		newedata->table_name = pstrdup(edata->table_name);
+	if (edata->column_name)
+		newedata->column_name = pstrdup(edata->column_name);
+	if (edata->datatype_name)
+		newedata->datatype_name = pstrdup(edata->datatype_name);
+	if (edata->constraint_name)
+		newedata->constraint_name = pstrdup(edata->constraint_name);
+	if (edata->internalquery)
+		newedata->internalquery = pstrdup(edata->internalquery);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	errfinish(0);
+}
+
+/*
  * ReThrowError --- re-throw a previously copied error
  *
  * A handler can do CopyErrorData/FlushErrorState to get out of the error
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index e4e354d..eee80b3 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -75,6 +75,9 @@ extern char *ssl_key_file;
 extern char *ssl_ca_file;
 extern char *ssl_crl_file;
 
+extern int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int  (*pq_flush_hook)(void);
+
 extern int	secure_initialize(void);
 extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
new file mode 100644
index 0000000..6bb24d9
--- /dev/null
+++ b/src/include/libpq/pqmq.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif   /* PQMQ_H */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index bbb5d39..2e8783f 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -285,7 +285,7 @@ extern Datum current_schema(PG_FUNCTION_ARGS);
 extern Datum current_schemas(PG_FUNCTION_ARGS);
 
 /* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
 extern void pg_itoa(int16 i, char *a);
 extern void pg_ltoa(int32 l, char *a);
 extern void pg_lltoa(int64 ll, char *a);
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 92073be..87438b8 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -415,6 +415,7 @@ extern ErrorData *CopyErrorData(void);
 extern void FreeErrorData(ErrorData *edata);
 extern void FlushErrorState(void);
 extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
 extern void pg_re_throw(void) __attribute__((noreturn));
 
 extern char *GetErrorContextStack(void);
-- 
1.7.9.6 (Apple Git-31.1)

From 59d3cb93c566368d9cc9dd26b3ffd0924c98ee36 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Thu, 17 Jul 2014 07:58:32 -0400
Subject: [PATCH 4/6] Add infrastructure to save and restore GUC values.

Amit Khandekar and Noah Misch, with a bit of further hacking by me.
---
 src/backend/utils/misc/guc.c |  397 ++++++++++++++++++++++++++++++++++++++++++
 src/include/utils/guc.h      |    5 +
 2 files changed, 402 insertions(+)

diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..cb324f3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -117,6 +117,12 @@
 #define S_PER_D (60 * 60 * 24)
 #define MS_PER_D (1000 * 60 * 60 * 24)
 
+/*
+ * Precision with which REAL type guc values are to be printed for GUC
+ * serialization.
+ */
+#define REALTYPE_PRECISION 17
+
 /* XXX these should appear in other modules' header files */
 extern bool Log_disconnections;
 extern int	CommitDelay;
@@ -146,6 +152,10 @@ char	   *GUC_check_errmsg_string;
 char	   *GUC_check_errdetail_string;
 char	   *GUC_check_errhint_string;
 
+static void
+do_serialize(char **destptr, Size *maxbytes, const char *fmt,...)
+/* This lets gcc check the format string for consistency. */
+__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
 
 static void set_config_sourcefile(const char *name, char *sourcefile,
 					  int sourceline);
@@ -8409,6 +8419,393 @@ read_nondefault_variables(void)
 }
 #endif   /* EXEC_BACKEND */
 
+/*
+ * can_skip_gucvar:
+ * When serializing, determine whether to skip this GUC.  When restoring, the
+ * negation of this test determines whether to restore the compiled-in default
+ * value before processing serialized values.
+ *
+ * A PGC_S_DEFAULT setting on the serialize side will typically match new
+ * postmaster children, but that can be false when got_SIGHUP == true and the
+ * pending configuration change modifies this setting.  Nonetheless, we omit
+ * PGC_S_DEFAULT settings from serialization and make up for that by restoring
+ * defaults before applying serialized values.
+ *
+ * PGC_POSTMASTER variables always have the same value in every child of a
+ * particular postmaster.  Most PGC_INTERNAL variables are compile-time
+ * constants; a few, like server_encoding and lc_ctype, are handled specially
+ * outside the serialize/restore procedure.  Therefore, SerializeGUCState()
+ * never sends these, and and RestoreGUCState() never changes them.
+ */
+static bool
+can_skip_gucvar(struct config_generic * gconf)
+{
+	return gconf->context == PGC_POSTMASTER ||
+		gconf->context == PGC_INTERNAL || gconf->source == PGC_S_DEFAULT;
+}
+
+/*
+ * estimate_variable_size:
+ * Estimate max size for dumping the given GUC variable.
+ */
+static Size
+estimate_variable_size(struct config_generic * gconf)
+{
+	Size		size;
+	Size		valsize;
+
+	if (can_skip_gucvar(gconf))
+		return 0;
+
+	size = 0;
+
+	size = add_size(size, strlen(gconf->name) + 1);
+
+	/* Get the maximum display length of the GUC value. */
+	switch (gconf->vartype)
+	{
+		case PGC_BOOL:
+			{
+				valsize = 5;	/* max(strlen('true'), strlen('false')) */
+			}
+			break;
+
+		case PGC_INT:
+			{
+				struct config_int *conf = (struct config_int *) gconf;
+
+				/*
+				 * Instead of getting the exact display length, use max
+				 * length.  Also reduce the max length for typical ranges of
+				 * small values.  Maximum value is 2147483647, i.e. 10 chars.
+				 * Include one byte for sign.
+				 */
+				if (abs(*conf->variable) < 1000)
+					valsize = 3 + 1;
+				else
+					valsize = 10 + 1;
+			}
+			break;
+
+		case PGC_REAL:
+			{
+				/*
+				 * We are going to print it with %.17g. Account for sign,
+				 * decimal point, and e+nnn notation. E.g.
+				 * -3.9932904234000002e+110
+				 */
+				valsize = REALTYPE_PRECISION + 1 + 1 + 5;
+			}
+			break;
+
+		case PGC_STRING:
+			{
+				struct config_string *conf = (struct config_string *) gconf;
+
+				valsize = strlen(*conf->variable);
+			}
+			break;
+
+		case PGC_ENUM:
+			{
+				struct config_enum *conf = (struct config_enum *) gconf;
+
+				valsize = strlen(config_enum_lookup_by_value(conf, *conf->variable));
+			}
+			break;
+	}
+
+	/* Valsize + NULL terminator */
+	size = add_size(size, valsize + 1);
+
+	if (gconf->sourcefile)
+		size = add_size(size, strlen(gconf->sourcefile));
+
+	/* NULL char */
+	size = add_size(size, 1);
+
+	/* Include line whenever we include file. */
+	if (gconf->sourcefile)
+		size = add_size(size, sizeof(gconf->sourceline));
+
+	size = add_size(size, sizeof(gconf->source));
+	size = add_size(size, sizeof(gconf->scontext));
+
+	return size;
+}
+
+/*
+ * EstimateGUCStateSpace:
+ * Returns the size needed to store the GUC state for the current process
+ */
+Size
+EstimateGUCStateSpace(void)
+{
+	Size		size;
+	int			i;
+
+	/* Add space reqd for saving the data size of the guc state */
+	size = sizeof(Size);
+
+	/* Add up the space needed for each GUC variable */
+	for (i = 0; i < num_guc_variables; i++)
+		size = add_size(size,
+						estimate_variable_size(guc_variables[i]));
+
+	return size;
+}
+
+/*
+ * do_serialize:
+ * Copies the formatted string into the destination.  Moves ahead the
+ * destination pointer, and decrements the maxbytes by that many bytes. If
+ * maxbytes is not sufficient to copy the string, error out.
+ */
+static void
+do_serialize(char **destptr, Size *maxbytes, const char *fmt,...)
+{
+	va_list		vargs;
+	int			n;
+
+	if (*maxbytes <= 0)
+		elog(ERROR, "not enough space to serialize GUC state");
+
+	va_start(vargs, fmt);
+	n = vsnprintf(*destptr, *maxbytes, fmt, vargs);
+	va_end(vargs);
+
+	/*
+	 * Cater to portability hazards in the vsnprintf() return value just like
+	 * appendPQExpBufferVA() does.  Note that this requires an extra byte of
+	 * slack at the end of the buffer.  Since serialize_variable() ends with a
+	 * do_serialize_binary() rather than a do_serialize(), we'll always have
+	 * that slack; estimate_variable_size() need not add a byte for it.
+	 */
+	if (n < 0 || n >= *maxbytes - 1)
+	{
+		if (n < 0 && errno != 0 && errno != ENOMEM)
+			/* Shouldn't happen. Better show errno description. */
+			elog(ERROR, "vsnprintf failed: %m");
+		else
+			elog(ERROR, "not enough space to serialize GUC state");
+	}
+
+	/* Shift the destptr ahead of the null terminator */
+	*destptr += n + 1;
+	*maxbytes -= n + 1;
+}
+
+/* Binary copy version of do_serialize() */
+static void
+do_serialize_binary(char **destptr, Size *maxbytes, void *val, Size valsize)
+{
+	if (valsize > *maxbytes)
+		elog(ERROR, "not enough space to serialize GUC state");
+
+	memcpy(*destptr, val, valsize);
+	*destptr += valsize;
+	*maxbytes -= valsize;
+}
+
+/*
+ * serialize_variable:
+ * Dumps name, value and other information of a GUC variable into destptr.
+ */
+static void
+serialize_variable(char **destptr, Size *maxbytes,
+				   struct config_generic * gconf)
+{
+	if (can_skip_gucvar(gconf))
+		return;
+
+	do_serialize(destptr, maxbytes, "%s", gconf->name);
+
+	switch (gconf->vartype)
+	{
+		case PGC_BOOL:
+			{
+				struct config_bool *conf = (struct config_bool *) gconf;
+
+				do_serialize(destptr, maxbytes,
+							 (*conf->variable ? "true" : "false"));
+			}
+			break;
+
+		case PGC_INT:
+			{
+				struct config_int *conf = (struct config_int *) gconf;
+
+				do_serialize(destptr, maxbytes, "%d", *conf->variable);
+			}
+			break;
+
+		case PGC_REAL:
+			{
+				struct config_real *conf = (struct config_real *) gconf;
+
+				do_serialize(destptr, maxbytes, "%.*g",
+							 REALTYPE_PRECISION, *conf->variable);
+			}
+			break;
+
+		case PGC_STRING:
+			{
+				struct config_string *conf = (struct config_string *) gconf;
+
+				do_serialize(destptr, maxbytes, "%s", *conf->variable);
+			}
+			break;
+
+		case PGC_ENUM:
+			{
+				struct config_enum *conf = (struct config_enum *) gconf;
+
+				do_serialize(destptr, maxbytes, "%s",
+						 config_enum_lookup_by_value(conf, *conf->variable));
+			}
+			break;
+	}
+
+	do_serialize(destptr, maxbytes, "%s",
+				 (gconf->sourcefile ? gconf->sourcefile : ""));
+
+	if (gconf->sourcefile)
+		do_serialize_binary(destptr, maxbytes, &gconf->sourceline,
+							sizeof(gconf->sourceline));
+
+	do_serialize_binary(destptr, maxbytes, &gconf->source,
+						sizeof(gconf->source));
+	do_serialize_binary(destptr, maxbytes, &gconf->scontext,
+						sizeof(gconf->scontext));
+}
+
+/*
+ * SerializeGUCState:
+ * Dumps the complete GUC state onto the memory location at start_address.
+ */
+void
+SerializeGUCState(Size maxsize, char *start_address)
+{
+	char	   *curptr;
+	Size		actual_size;
+	Size		bytes_left;
+	int			i;
+	int			i_role;
+
+	/* Reserve space for saving the actual size of the guc state */
+	curptr = start_address + sizeof(actual_size);
+	bytes_left = maxsize - sizeof(actual_size);
+
+	for (i = 0; i < num_guc_variables; i++)
+	{
+		/*
+		 * It's pretty ugly, but we've got to force "role" to be initialized
+		 * after "session_authorization"; otherwise, the latter will override
+		 * the former.
+		 */
+		if (strcmp(guc_variables[i]->name, "role") == 0)
+			i_role = i;
+		else
+			serialize_variable(&curptr, &bytes_left, guc_variables[i]);
+	}
+	serialize_variable(&curptr, &bytes_left, guc_variables[i_role]);
+
+	/* Store actual size without assuming alignment of start_address. */
+	actual_size = maxsize - bytes_left - sizeof(actual_size);
+	memcpy(start_address, &actual_size, sizeof(actual_size));
+}
+
+/*
+ * read_gucstate:
+ * Actually it does not read anything, just returns the srcptr. But it does
+ * move the srcptr past the terminating NULL char, so that the caller is ready
+ * to read the next string.
+ */
+static char *
+read_gucstate(char **srcptr, char *srcend)
+{
+	char	   *retptr = *srcptr;
+	char	   *ptr;
+
+	if (*srcptr >= srcend)
+		elog(ERROR, "incomplete GUC state");
+
+	/* The string variables are all null terminated */
+	for (ptr = *srcptr; ptr < srcend && *ptr != '\0'; ptr++)
+		;
+
+	if (ptr > srcend)
+		elog(ERROR, "could not find null terminator in GUC state");
+
+	/* Set the new position at the position after the NULL character */
+	*srcptr = ptr + 1;
+
+	return retptr;
+}
+
+/* Binary read version of read_gucstate(). Copies into dest */
+static void
+read_gucstate_binary(char **srcptr, char *srcend, void *dest, Size size)
+{
+	if (*srcptr + size > srcend)
+		elog(ERROR, "incomplete GUC state");
+
+	memcpy(dest, *srcptr, size);
+	*srcptr += size;
+}
+
+/*
+ * RestoreGUCState:
+ * Reads the GUC state at the specified address and updates the GUCs with the
+ * values read from the GUC state.
+ */
+void
+RestoreGUCState(void *gucstate)
+{
+	char	   *varname,
+			   *varvalue,
+			   *varsourcefile;
+	int			varsourceline;
+	GucSource	varsource;
+	GucContext	varscontext;
+	char	   *srcptr = (char *) gucstate;
+	char	   *srcend;
+	Size		len;
+	int			i;
+
+	/* See comment at can_skip_gucvar(). */
+	for (i = 0; i < num_guc_variables; i++)
+		if (!can_skip_gucvar(guc_variables[i]))
+			InitializeOneGUCOption(guc_variables[i]);
+
+	/* First item is the length of the subsequent data */
+	memcpy(&len, gucstate, sizeof(len));
+
+	srcptr += sizeof(len);
+	srcend = srcptr + len;
+
+	while (srcptr < srcend)
+	{
+		if ((varname = read_gucstate(&srcptr, srcend)) == NULL)
+			break;
+
+		varvalue = read_gucstate(&srcptr, srcend);
+		varsourcefile = read_gucstate(&srcptr, srcend);
+		if (varsourcefile[0])
+			read_gucstate_binary(&srcptr, srcend,
+								 &varsourceline, sizeof(varsourceline));
+		read_gucstate_binary(&srcptr, srcend,
+							 &varsource, sizeof(varsource));
+		read_gucstate_binary(&srcptr, srcend,
+							 &varscontext, sizeof(varscontext));
+
+		(void) set_config_option(varname, varvalue,
+								 varscontext, varsource,
+								 GUC_ACTION_SET, true, 0);
+		if (varsourcefile[0])
+			set_config_sourcefile(varname, varsourcefile, varsourceline);
+	}
+}
 
 /*
  * A little "long argument" simulation, although not quite GNU
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0a729c1..47cac5c 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -351,6 +351,11 @@ extern void write_nondefault_variables(GucContext context);
 extern void read_nondefault_variables(void);
 #endif
 
+/* GUC serialization */
+extern Size EstimateGUCStateSpace(void);
+extern void SerializeGUCState(Size maxsize, char *start_address);
+extern void RestoreGUCState(void *gucstate);
+
 /* Support for messages reported from GUC check hooks */
 
 extern PGDLLIMPORT char *GUC_check_errmsg_string;
-- 
1.7.9.6 (Apple Git-31.1)

From 654e3c0371662533874323ef99ca06f3ab180d25 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Thu, 17 Jul 2014 10:33:50 -0400
Subject: [PATCH 5/6] Add a function to get the authenticated user ID.

Previously, this was not exposed outside of miscinit.c.
---
 src/backend/utils/init/miscinit.c |   10 ++++++++++
 src/include/miscadmin.h           |    1 +
 2 files changed, 11 insertions(+)

diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index a703c67..8fccb4c 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,6 +222,16 @@ SetSessionUserId(Oid userid, bool is_superuser)
 	CurrentUserId = userid;
 }
 
+/*
+ * GetAuthenticatedUserId - get the authenticated user ID
+ */
+Oid
+GetAuthenticatedUserId(void)
+{
+	AssertState(OidIsValid(AuthenticatedUserId));
+	return AuthenticatedUserId;
+}
+
 
 /*
  * GetUserIdAndSecContext/SetUserIdAndSecContext - get/set the current user ID
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index c2b786e..4badcbc 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -282,6 +282,7 @@ extern char *GetUserNameFromId(Oid roleid);
 extern Oid	GetUserId(void);
 extern Oid	GetOuterUserId(void);
 extern Oid	GetSessionUserId(void);
+extern Oid	GetAuthenticatedUserId(void);
 extern void GetUserIdAndSecContext(Oid *userid, int *sec_context);
 extern void SetUserIdAndSecContext(Oid userid, int sec_context);
 extern bool InLocalUserIdChange(void);
-- 
1.7.9.6 (Apple Git-31.1)

From c835a06f20792556d35a0eee4c2fa21f5f23e8a3 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Fri, 11 Jul 2014 09:53:40 -0400
Subject: [PATCH 6/6] pg_background: Run commands in a background worker, and
 get the results.

The currently-active GUC values from the user session will be copied
to the background worker.  If the command returns a result set, you
can retrieve the result set; if not, you can retrieve the command
tags.  If the command fails with an error, the same error will be
thrown in the launching process when the results are retrieved.
Warnings and other messages generated by the background worker, and
notifications received by it, are also propagated to the foreground
process.
---
 contrib/Makefile                             |    1 +
 contrib/pg_background/Makefile               |   18 +
 contrib/pg_background/pg_background--1.0.sql |   17 +
 contrib/pg_background/pg_background.c        |  919 ++++++++++++++++++++++++++
 contrib/pg_background/pg_background.control  |    4 +
 5 files changed, 959 insertions(+)
 create mode 100644 contrib/pg_background/Makefile
 create mode 100644 contrib/pg_background/pg_background--1.0.sql
 create mode 100644 contrib/pg_background/pg_background.c
 create mode 100644 contrib/pg_background/pg_background.control

diff --git a/contrib/Makefile b/contrib/Makefile
index b37d0dd..11d6116 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -30,6 +30,7 @@ SUBDIRS = \
 		pageinspect	\
 		passwordcheck	\
 		pg_archivecleanup \
+		pg_background \
 		pg_buffercache	\
 		pg_freespacemap \
 		pg_prewarm	\
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..c4e717d
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,18 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..5fa5ddb
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,17 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_launch(sql pg_catalog.text,
+					   queue_size pg_catalog.int4 DEFAULT 65536)
+    RETURNS pg_catalog.int4 STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+    RETURNS SETOF pg_catalog.record STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_detach(pid pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..19d139b
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,919 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+
+#include "access/htup_details.h"
+#include "access/printtup.h"
+#include "access/xact.h"
+#include "catalog/pg_type.h"
+#include "commands/async.h"
+#include "commands/dbcommands.h"
+#include "funcapi.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "parser/analyze.h"
+#include "pgstat.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
+#include "utils/timeout.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define PG_BACKGROUND_MAGIC				0x50674267
+#define PG_BACKGROUND_KEY_FIXED_DATA	0
+#define PG_BACKGROUND_KEY_SQL			1
+#define PG_BACKGROUND_KEY_GUC			2
+#define PG_BACKGROUND_KEY_QUEUE			3
+#define PG_BACKGROUND_NKEYS				4
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct pg_background_fixed_data
+{
+	Oid	database_id;
+	Oid	authenticated_user_id;
+	Oid	current_user_id;
+	int	sec_context;
+	char database[NAMEDATALEN];
+	char authenticated_user[NAMEDATALEN];
+} pg_background_fixed_data;
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+	pid_t		pid;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *handle;	
+	shm_mq_handle *responseq;	
+	bool		consumed;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+	pg_background_worker_info *info;
+	FmgrInfo   *receive_functions;
+	Oid		   *typioparams;
+	bool		has_row_description;
+	List	   *command_tags;
+	bool		complete;
+} pg_background_result_state;
+
+static HTAB *worker_hash;
+
+static void cleanup_worker_info(dsm_segment *, Datum pid_datum);
+static pg_background_worker_info *find_worker_info(pid_t pid);
+static void save_worker_info(pid_t pid, dsm_segment *seg,
+				 BackgroundWorkerHandle *handle,
+				 shm_mq_handle *responseq);
+
+static HeapTuple form_result_tuple(pg_background_result_state *state,
+								   TupleDesc tupdesc, StringInfo msg);
+
+static void handle_sigterm(SIGNAL_ARGS);
+static void execute_sql_string(const char *sql);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_launch);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_detach);
+
+void pg_background_worker_main(Datum);
+
+/*
+ * Start a dynamic background worker to run a user-specified SQL command.
+ */
+Datum
+pg_background_launch(PG_FUNCTION_ARGS)
+{
+	text	   *sql = PG_GETARG_TEXT_PP(0);
+	int32		queue_size = PG_GETARG_INT64(1);
+	int32		sql_len = VARSIZE_ANY_EXHDR(sql);
+	Size		guc_len;
+	Size		segsize;
+	dsm_segment *seg;
+	shm_toc_estimator e;
+	shm_toc    *toc;
+    char	   *sqlp;
+	char	   *gucstate;
+	shm_mq	   *mq;
+	BackgroundWorker worker;
+	BackgroundWorkerHandle *worker_handle;
+	pg_background_fixed_data *fdata;
+	pid_t		pid;
+	shm_mq_handle *responseq;
+	MemoryContext	oldcontext;
+
+	/* Ensure a valid queue size. */
+	if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size must be at least %zu bytes",
+						shm_mq_minimum_size)));
+
+	/* Create dynamic shared memory and table of contents. */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(pg_background_fixed_data));
+	shm_toc_estimate_chunk(&e, sql_len + 1);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_chunk(&e, (Size) queue_size);
+	shm_toc_estimate_keys(&e, PG_BACKGROUND_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	seg = dsm_create(segsize);
+	toc = shm_toc_create(PG_BACKGROUND_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(pg_background_fixed_data));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	strlcpy(fdata->database, get_database_name(MyDatabaseId), NAMEDATALEN);
+	strlcpy(fdata->authenticated_user,
+			GetUserNameFromId(fdata->authenticated_user_id), NAMEDATALEN);
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_FIXED_DATA, fdata);
+
+	/* Store SQL query in dynamic shared memory. */
+	sqlp = shm_toc_allocate(toc, sql_len + 1);
+	memcpy(sqlp, VARDATA(sql), sql_len);
+	sqlp[sql_len] = '\0';
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_SQL, sqlp);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_GUC, gucstate);
+
+	/* Establish message queue in dynamic shared memory. */
+	mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+					   (Size) queue_size);
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_QUEUE, mq);
+	shm_mq_set_receiver(mq, MyProc);
+
+	/*
+	 * Attach the queue before launching a worker, so that we'll automatically
+	 * detach the queue if we error out.  (Otherwise, the worker might sit
+	 * there trying to write the queue long after we've gone away.)
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	responseq = shm_mq_attach(mq, seg, NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Configure a worker. */
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = NULL;		/* new worker might not have library loaded */
+	sprintf(worker.bgw_library_name, "pg_background");
+	sprintf(worker.bgw_function_name, "pg_background_worker_main");
+	snprintf(worker.bgw_name, BGW_MAXLEN,
+			 "pg_background by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	/* set bgw_notify_pid, so we can detect if the worker stops */
+	worker.bgw_notify_pid = MyProcPid;
+
+	/*
+	 * Register the worker.
+	 *
+	 * We switch contexts so that the background worker handle can outlast
+	 * this transaction.
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	if (!RegisterDynamicBackgroundWorker(&worker, &worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+			 errhint("You may need to increase max_worker_processes.")));
+	MemoryContextSwitchTo(oldcontext);
+	shm_mq_set_handle(responseq, worker_handle);
+
+	/* Wait for the worker to start. */
+	switch (WaitForBackgroundWorkerStartup(worker_handle, &pid))
+	{
+		case BGWH_STARTED:
+			/* Success. */
+			break;
+		case BGWH_STOPPED:
+			pfree(worker_handle);
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not start background process"),
+					 errhint("More details may be available in the server log.")));
+			break;
+        case BGWH_POSTMASTER_DIED:
+			pfree(worker_handle);
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("cannot start background processes without postmaster"),
+					 errhint("Kill all remaining database processes and restart the database.")));
+			break;
+		default:
+			elog(ERROR, "unexpected bgworker handle status");
+			break;
+	}
+
+	/* Store the relevant details about this worker for future use. */
+	save_worker_info(pid, seg, worker_handle, responseq);
+
+	/*
+	 * Now that the worker info is saved, we do not need to, and should not,
+	 * automatically detach the segment at resource-owner cleanup time.
+	 */
+	dsm_keep_mapping(seg);
+
+	/* Return the worker's PID. */
+	PG_RETURN_INT32(pid);
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+	int32		pid = PG_GETARG_INT32(0);
+	shm_mq_result	res;
+	FuncCallContext *funcctx;
+	TupleDesc	tupdesc;
+	StringInfoData	msg;
+	pg_background_result_state *state;
+
+	/* First-time setup. */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext	oldcontext;
+		pg_background_worker_info *info;
+
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* See if we have a connection to the specified PID. */
+		if ((info = find_worker_info(pid)) == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("PID %d is not attached to this session", pid)));
+
+		/* Can't read results twice. */
+		if (info->consumed)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("results for PID %d have already been consumed", pid)));
+		info->consumed = true;		
+
+		/*
+		 * Whether we succeed or fail, a future invocation of this function
+		 * may not try to read from the DSM once we've begun to do so.
+		 * Accordingly, make arrangements to clean things up at end of query.
+		 */
+		dsm_unkeep_mapping(info->seg);
+
+		/* Set up tuple-descriptor based on colum definition list. */
+		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record"),
+					 errhint("Try calling the function in the FROM clause "
+							 "using a column definition list.")));
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/* Cache state that will be needed on every call. */
+		state = palloc0(sizeof(pg_background_result_state));
+		state->info = info;
+		if (funcctx->tuple_desc->natts > 0)
+		{
+			int	natts = funcctx->tuple_desc->natts;
+			int	i;
+
+			state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
+			state->typioparams = palloc(sizeof(Oid) * natts);
+
+			for (i = 0;	i < natts; ++i)
+			{
+				Oid	receive_function_id;
+
+				getTypeBinaryInputInfo(funcctx->tuple_desc->attrs[i]->atttypid,
+									   &receive_function_id,
+									   &state->typioparams[i]);
+				fmgr_info(receive_function_id, &state->receive_functions[i]);
+			}
+		}
+		funcctx->user_fctx = state;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	funcctx = SRF_PERCALL_SETUP();
+	tupdesc = funcctx->tuple_desc;
+	state = funcctx->user_fctx;
+
+	/* Initialize message buffer. */
+	initStringInfo(&msg);
+
+	/* Read and processes messages from the shared memory queue. */
+	for (;;)
+	{
+		char		msgtype;
+		Size		nbytes;
+		void	   *data;
+
+		/* Get next message. */
+		res = shm_mq_receive(state->info->responseq, &nbytes, &data, false);
+		if (res != SHM_MQ_SUCCESS)
+			break;
+
+		/*
+		 * Message-parsing routines operate on a null-terminated StringInfo,
+		 * so we must construct one.
+		 */
+		resetStringInfo(&msg);
+		enlargeStringInfo(&msg, nbytes);
+		msg.len = nbytes;
+		memcpy(msg.data, data, nbytes);
+		msg.data[nbytes] = '\0';
+		msgtype = pq_getmsgbyte(&msg);
+
+		/* Dispatch on message type. */
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				{
+					ErrorData	edata;
+
+					/* Parse ErrorResponse or NoticeResponse. */
+					pq_parse_errornotice(&msg, &edata);
+
+					/*
+					 * Limit the maximum error level to ERROR.  We don't want
+					 * a FATAL inside the background worker to kill the user
+					 * session.
+					 */
+					if (edata.elevel > ERROR)
+						edata.elevel = ERROR;
+
+					/* Rethrow the error. */
+					ThrowErrorData(&edata);	
+					break;
+				}
+			case 'A':
+				{
+					/* Propagate NotifyResponse. */
+					pq_putmessage(msg.data[0], &msg.data[1], nbytes - 1);
+					break;
+				}
+			case 'T':
+				{
+					int16	natts = pq_getmsgint(&msg, 2);
+					int16	i;
+
+					if (state->has_row_description)
+						elog(ERROR, "multiple RowDescription messages");
+					state->has_row_description = true;
+   					if (natts != tupdesc->natts)
+						ereport(ERROR,
+								(errcode(ERRCODE_DATATYPE_MISMATCH),
+								 errmsg("remote query result rowtype does not match "
+									"the specified FROM clause rowtype")));
+
+					for (i = 0; i < natts; ++i)
+					{
+						Oid		type_id;
+
+						(void) pq_getmsgstring(&msg);	/* name */
+						(void) pq_getmsgint(&msg, 4);	/* table OID */
+						(void) pq_getmsgint(&msg, 2);	/* table attnum */
+						type_id = pq_getmsgint(&msg, 4);	/* type OID */
+						(void) pq_getmsgint(&msg, 2);	/* type length */
+						(void) pq_getmsgint(&msg, 4);	/* typmod */
+						(void) pq_getmsgint(&msg, 2);	/* format code */
+
+						if (type_id != tupdesc->attrs[i]->atttypid)
+							ereport(ERROR,
+									(errcode(ERRCODE_DATATYPE_MISMATCH),
+									 errmsg("remote query result rowtype does not match "
+										"the specified FROM clause rowtype")));
+					}
+
+					pq_getmsgend(&msg);
+
+					break;
+				}
+			case 'D':
+				{
+					/* Handle DataRow message. */
+					HeapTuple	result;
+
+					result = form_result_tuple(state, tupdesc, &msg);
+					SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+				}
+			case 'C':
+				{
+					/* Handle CommandComplete message. */
+					MemoryContext	oldcontext;
+					const char  *tag = pq_getmsgstring(&msg);
+
+					oldcontext =
+						MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+					state->command_tags = lappend(state->command_tags,
+												  pstrdup(tag));
+					MemoryContextSwitchTo(oldcontext);
+					break;
+				}
+			case 'Z':
+				{
+					/* Handle ReadyForQuery message. */
+					state->complete = true;
+					break;
+				}
+			default:
+				elog(WARNING, "unknown message type: %c (%zu bytes)",
+					 msg.data[0], nbytes);
+				break;
+		}
+	}
+
+	/* Check whether the connection was broken prematurely. */
+	if (!state->complete)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("lost connection to worker process with PID %d",
+					pid)));
+
+	/* If no data rows, return the command tags instead. */
+	if (!state->has_row_description)
+	{
+		if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query did not return a result set, but "
+							"result rowtype is not a single text column")));
+		if (state->command_tags != NIL)
+		{
+			char *tag = linitial(state->command_tags);
+			Datum	value;
+			bool	isnull;
+			HeapTuple	result;
+
+			state->command_tags = list_delete_first(state->command_tags);
+			value = PointerGetDatum(cstring_to_text(tag));
+			isnull = false;
+			result = heap_form_tuple(tupdesc, &value, &isnull);
+			SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+		}			
+	}
+
+	/* We're done! */
+	dsm_detach(state->info->seg);
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Parse a DataRow message and form a result tuple.
+ */
+static HeapTuple
+form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
+				  StringInfo msg)
+{
+	/* Handle DataRow message. */
+	int16	natts = pq_getmsgint(msg, 2);
+	int16	i;
+	Datum  *values = NULL;
+	bool   *isnull = NULL;
+	StringInfoData	buf;
+
+	if (!state->has_row_description)
+		elog(ERROR, "DataRow not preceded by RowDescription");
+   	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+	if (natts > 0)
+	{
+		values = palloc(natts * sizeof(Datum));
+		isnull = palloc(natts * sizeof(bool));
+	}
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; ++i)
+	{
+		int32	bytes = pq_getmsgint(msg, 4);
+
+		if (bytes < 0)
+		{
+			values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+											NULL,
+											state->typioparams[i],
+											tupdesc->attrs[i]->atttypmod);
+			isnull[i] = true;
+		}
+		else
+		{
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes);
+			values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+											&buf,
+											state->typioparams[i],
+											tupdesc->attrs[i]->atttypmod);
+			isnull[i] = false;
+		}
+	}
+
+	pq_getmsgend(msg);
+
+	return heap_form_tuple(tupdesc, values, isnull);
+}
+
+/*
+ * Detach from the dynamic shared memory segment used for communication with
+ * a background worker.  This prevents the worker from stalling waiting for
+ * us to read its results.
+ */
+Datum
+pg_background_detach(PG_FUNCTION_ARGS)
+{
+	int32		pid = PG_GETARG_INT32(0);
+	pg_background_worker_info *info;
+
+	info = find_worker_info(pid);
+	if (info == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("PID %d is not attached to this session", pid)));
+	dsm_detach(info->seg);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * When the dynamic shared memory segment associated with a worker is
+ * cleaned up, we need to clean up our associated private data structures.
+ */
+static void
+cleanup_worker_info(dsm_segment *seg, Datum pid_datum)
+{
+	pid_t	pid = DatumGetInt32(pid_datum);
+	bool	found;
+	pg_background_worker_info *info;
+
+	/* Find any worker info entry for this PID.  If none, we're done. */
+	if ((info = find_worker_info(pid)) == NULL)
+		return;
+
+	/* Free memory used by the BackgroundWorkerHandle. */
+	if (info->handle != NULL)
+	{
+		pfree(info->handle);
+		info->handle = NULL;
+	}
+
+	/* Remove the hashtable entry. */
+	hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+	if (!found)
+		elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(pid_t pid)
+{
+	pg_background_worker_info *info = NULL;
+
+	if (worker_hash != NULL)
+		info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+	return info;
+}
+
+/*
+ * Save worker information for future IPC.
+ */
+static void
+save_worker_info(pid_t pid, dsm_segment *seg, BackgroundWorkerHandle *handle,
+				 shm_mq_handle *responseq)
+{
+	pg_background_worker_info *info;
+
+	/* If the hash table hasn't been set up yet, do that now. */
+	if (worker_hash == NULL)
+	{
+		HASHCTL	ctl;
+
+		ctl.keysize = sizeof(pid_t);
+		ctl.entrysize = sizeof(pg_background_worker_info);
+		worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+								  HASH_ELEM);
+	}
+
+	/* Detach any older worker with this PID. */
+	if ((info = find_worker_info(pid)) != NULL)
+		dsm_detach(info->seg);
+
+	/* When the DSM is unmapped, clean everything up. */
+	on_dsm_detach(seg, cleanup_worker_info, Int32GetDatum(pid));
+
+	/* Create a new entry for this worker. */
+	info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+	info->seg = seg;
+	info->handle = handle;
+	info->responseq = responseq;
+	info->consumed = false;
+}
+
+/*
+ * Background worker entrypoint.
+ */
+void
+pg_background_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc    *toc;
+	pg_background_fixed_data *fdata;
+	char	   *sql;
+	char	   *gucstate;
+	shm_mq	   *mq;
+	shm_mq_handle *responseq;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_background");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "pg_background session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+	
+
+	/* Connect to the dynamic shared memory segment. */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PG_BACKGROUND_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			   errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, PG_BACKGROUND_KEY_FIXED_DATA);
+	sql = shm_toc_lookup(toc, PG_BACKGROUND_KEY_SQL);
+	gucstate = shm_toc_lookup(toc, PG_BACKGROUND_KEY_GUC);
+	mq = shm_toc_lookup(toc, PG_BACKGROUND_KEY_QUEUE);
+	shm_mq_set_sender(mq, MyProc);
+	responseq = shm_mq_attach(mq, seg, NULL);
+
+	/* Redirect protocol messages to responseq. */
+	pq_redirect_to_shm_mq(mq, responseq);
+
+	/*
+	 * Initialize our user and database ID based on the strings version of
+	 * the data, and then go back and check that we actually got the database
+	 * and user ID that we intended to get.  We do this because it's not
+	 * impossible for the process that started us to die before we get here,
+	 * and the user or database could be renamed in the meantime.  We don't
+	 * want to latch on the wrong object by accident.  There should probably
+	 * be a variant of BackgroundWorkerInitializeConnection that accepts OIDs
+	 * rather than strings.
+	 */
+	BackgroundWorkerInitializeConnection(fdata->database,
+										 fdata->authenticated_user);
+	if (fdata->database_id != MyDatabaseId ||
+		fdata->authenticated_user_id != GetAuthenticatedUserId())
+		ereport(ERROR,
+			(errmsg("user or database renamed during pg_background startup")));
+
+	/* Restore GUC values from launching backend. */
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	/* Restore user ID and security context. */
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	/* Prepare to execute the query. */
+	SetCurrentStatementStartTimestamp();
+	debug_query_string = sql;
+   	pgstat_report_activity(STATE_RUNNING, sql);
+	StartTransactionCommand();
+	if (StatementTimeout > 0)
+		enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
+	else
+		disable_timeout(STATEMENT_TIMEOUT, false);
+
+	/* Execute the query. */
+	execute_sql_string(sql);
+
+	/* Post-execution cleanup. */
+	disable_timeout(STATEMENT_TIMEOUT, false);
+	CommitTransactionCommand();
+	ProcessCompletedNotifies();
+   	pgstat_report_activity(STATE_IDLE, sql);
+	pgstat_report_stat(true);
+
+	/* Signal that we are done. */
+	ReadyForQuery(DestRemote);
+}
+
+/*
+ * Execute given SQL string.
+ *
+ * Using SPI here would preclude backgrounding commands like VACUUM which one
+ * might very well wish to launch in the background.  So we do this instead.
+ */
+static void
+execute_sql_string(const char *sql)
+{
+	List	   *raw_parsetree_list;
+	ListCell   *lc1;
+	bool		isTopLevel;
+	int			commands_remaining;
+	MemoryContext	parsecontext;
+	MemoryContext	oldcontext;
+
+	/*
+	 * Parse the SQL string into a list of raw parse trees.
+	 *
+	 * Because we allow statements that perform internal transaction control,
+	 * we can't do this in TopTransactionContext; the parse trees might get
+	 * blown away before we're done executing them.
+	 */
+	parsecontext = AllocSetContextCreate(TopMemoryContext,
+										 "pg_background parse/plan",
+										 ALLOCSET_DEFAULT_MINSIZE,
+										 ALLOCSET_DEFAULT_INITSIZE,
+										 ALLOCSET_DEFAULT_MAXSIZE);
+	oldcontext = MemoryContextSwitchTo(parsecontext);
+	raw_parsetree_list = pg_parse_query(sql);
+	commands_remaining = list_length(raw_parsetree_list);
+	isTopLevel = commands_remaining == 1;
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * Do parse analysis, rule rewrite, planning, and execution for each raw
+	 * parsetree.  We must fully execute each query before beginning parse
+	 * analysis on the next one, since there may be interdependencies.
+	 */
+	foreach(lc1, raw_parsetree_list)
+	{
+		Node	   *parsetree = (Node *) lfirst(lc1);
+        const char *commandTag;
+        char        completionTag[COMPLETION_TAG_BUFSIZE];
+        List       *querytree_list,
+                   *plantree_list;
+		bool		snapshot_set = false;
+		Portal		portal;
+		DestReceiver *receiver;
+		int16		format = 1;
+
+		/*
+		 * We don't allow transaction-control commands like COMMIT and ABORT
+		 * here.  The entire SQL statement is executed as a single transaction
+		 * which commits if no errors are encountered.
+		 */
+		if (IsA(parsetree, TransactionStmt))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("transaction control statements are not allowed in pg_background")));
+
+        /*
+         * Get the command name for use in status display (it also becomes the
+         * default completion tag, down inside PortalRun).  Set ps_status and
+         * do any special start-of-SQL-command processing needed by the
+         * destination.
+         */
+        commandTag = CreateCommandTag(parsetree);
+        set_ps_display(commandTag, false);
+        BeginCommand(commandTag, DestNone);
+
+		/* Set up a snapshot if parse analysis/planning will need one. */
+		if (analyze_requires_snapshot(parsetree))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * OK to analyze, rewrite, and plan this query.
+		 *
+		 * As with parsing, we need to make sure this data outlives the
+		 * transaction, because of the possibility that the statement might
+		 * perform internal transaction control.
+		 */
+		oldcontext = MemoryContextSwitchTo(parsecontext);
+		querytree_list = pg_analyze_and_rewrite(parsetree, sql, NULL, 0);
+		plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+
+		/* Done with the snapshot used for parsing/planning */
+        if (snapshot_set)
+            PopActiveSnapshot();
+
+        /* If we got a cancel signal in analysis or planning, quit */
+        CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Execute the query using the unnamed portal.
+		 */
+        portal = CreatePortal("", true, true);
+        /* Don't display the portal in pg_cursors */
+        portal->visible = false;
+		PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
+		PortalStart(portal, NULL, 0, InvalidSnapshot);
+		PortalSetResultFormat(portal, 1, &format);		/* binary format */
+
+		/*
+		 * Tuples returned by any command other than the last are simply
+		 * discarded; but those returned by the last (or only) command are
+		 * redirected to the shared memory queue we're using for communication
+		 * with the launching backend. If the launching backend is gone or has
+		 * detached us, these messages will just get dropped on the floor.
+		 */
+		--commands_remaining;
+		if (commands_remaining > 0)
+			receiver = CreateDestReceiver(DestNone);
+		else
+		{
+			receiver = CreateDestReceiver(DestRemote);
+			SetRemoteDestReceiverParams(receiver, portal);
+		}
+
+		/*
+		 * Only once the portal and destreceiver have been established can
+		 * we return to the transaction context.  All that stuff needs to
+		 * survive an internal commit inside PortalRun!
+		 */
+		MemoryContextSwitchTo(oldcontext);
+
+		/* Here's where we actually execute the command. */
+		(void) PortalRun(portal, FETCH_ALL, isTopLevel, receiver, receiver,
+                         completionTag);
+
+		/* Clean up the receiver. */
+		(*receiver->rDestroy) (receiver);
+
+		/*
+		 * Send a CommandComplete message even if we suppressed the query
+		 * results.  The user backend will report these in the absence of
+		 * any true query results.
+		 */
+		EndCommand(completionTag, DestRemote);
+
+		/* Clean up the portal. */
+		PortalDrop(portal, false);
+	}
+
+	/* Be sure to advance the command counter after the last script command */
+	CommandCounterIncrement();
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
-- 
1.7.9.6 (Apple Git-31.1)

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to