On Wed, Oct 8, 2014 at 6:32 PM, Andres Freund <and...@2ndquadrant.com> wrote:
> I got to ask: Why is it helpful that we have this in contrib? I have a
> good share of blame to bear for that, but I think we need to stop
> dilluting contrib evermore with test programs. These have a place, but I
> don't think it should be contrib.

I don't think pg_background is merely a test program: I think it's a
quite useful piece of functionality.  It can be used for running
VACUUM from a procedure, which is something people have asked for more
than once, or for simulating an autonomous transaction.  Granted,
it'll be a lot slower than a real autonomous transaction, but it's
still better than doing it via dblink, because you don't have to futz
with authentication.

I would be all in favor of moving things like test_decoding,
test_parser, and test_shm_mq to src/test or contrib/test or wherever
else we want to put them, but I think pg_background belongs in
contrib.

>> +/* Fixed-size data passed via our dynamic shared memory segment. */
>> +typedef struct pg_background_fixed_data
>> +{
>> +     Oid     database_id;
>> +     Oid     authenticated_user_id;
>> +     Oid     current_user_id;
>> +     int     sec_context;
>> +     char database[NAMEDATALEN];
>> +     char authenticated_user[NAMEDATALEN];
>> +} pg_background_fixed_data;
>
> Why not NameData?

No particular reason.  Changed.

> whitespace damage.

I went through and fixed everything that git diff --check complained
about.  Let me know if you see other problems yet.

>> +static HTAB *worker_hash;
>
> Hm. So the lifetime of this hash's contents is managed via
> on_dsm_detach(), do I understand that correctly?

More or less, yes.

> Hm. So every user can do this once the extension is created as the
> functions are most likely to be PUBLIC. Is this a good idea?

Why not?  If they can log in, they could start separate sessions with
similar effect.

>> +             /*
>> +              * Whether we succeed or fail, a future invocation of this 
>> function
>> +              * may not try to read from the DSM once we've begun to do so.
>> +              * Accordingly, make arrangements to clean things up at end of 
>> query.
>> +              */
>> +             dsm_unkeep_mapping(info->seg);
>> +
>> +             /* Set up tuple-descriptor based on colum definition list. */
>> +             if (get_call_result_type(fcinfo, NULL, &tupdesc) != 
>> TYPEFUNC_COMPOSITE)
>> +                     ereport(ERROR,
>> +                                     
>> (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
>> +                                      errmsg("function returning record 
>> called in context "
>> +                                                     "that cannot accept 
>> type record"),
>> +                                      errhint("Try calling the function in 
>> the FROM clause "
>> +                                                      "using a column 
>> definition list.")));
>
> Hm, normally we don't add linebreaks inside error messages.

I copied it from dblink.

> I'm unsure right now about the rules surrounding this, but shouldn't we
> check that the user is allowed to execute these? And shouldn't we fall
> back to non binary functions if no binary ones are available?

I can't see any reason to do either of those things.  I'm not aware
that returning data in binary format is in any way intended to be a
security-restricted operation, or that we have any data types that
actually matter without send and receive functions.  If we do, I think
the solution is to add them, not make this more complicated.

>> +                                     /*
>> +                                      * Limit the maximum error level to 
>> ERROR.  We don't want
>> +                                      * a FATAL inside the background 
>> worker to kill the user
>> +                                      * session.
>> +                                      */
>> +                                     if (edata.elevel > ERROR)
>> +                                             edata.elevel = ERROR;
>
> Hm. But we still should report that it FATALed? Maybe add error context
> notice about it? Not nice, but I don't have a immediately better idea. I
> think it generally would be rather helpful to add the information that
> this wasn't originally an error triggered by this process. The user
> might otherwise be confused when looking for the origin of the error in
> the log.

Yeah, I was wondering if we needed some kind of annotation here.  What
I'm wondering about is appending something to the errcontext, perhaps
"background worker, PID %d".

>> +                     case 'A':
>> +                             {
>> +                                     /* Propagate NotifyResponse. */
>> +                                     pq_putmessage(msg.data[0], 
>> &msg.data[1], nbytes - 1);
>> +                                     break;
>
> Hm. Are we sure to be in a situation where the client expects these? And
> are we sure their encoding is correct? The other data goe through
> input/output methods checking for that, but here we don't. And the other
> side AFAICS could have done a SET client_encoding.

I think there's no problem at the protocol level; I think the server
can send NotifyResponse pretty much whenever.  It could be argued that
this is a POLA violation, but dropping the notify messages on the
floor (which seems to be the other option) doesn't seem like superior.
So I think this is mostly a matter of documentation.

>> +/*
>> + * Parse a DataRow message and form a result tuple.
>> + */
>> +static HeapTuple
>> +form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
>> +                               StringInfo msg)
>> +{
>> +     /* Handle DataRow message. */
>> +     int16   natts = pq_getmsgint(msg, 2);
>> +     int16   i;
>> +     Datum  *values = NULL;
>> +     bool   *isnull = NULL;
>> +     StringInfoData  buf;
>> +
>> +     if (!state->has_row_description)
>> +             elog(ERROR, "DataRow not preceded by RowDescription");
>> +     if (natts != tupdesc->natts)
>> +             elog(ERROR, "malformed DataRow");
>> +     if (natts > 0)
>> +     {
>> +             values = palloc(natts * sizeof(Datum));
>> +             isnull = palloc(natts * sizeof(bool));
>> +     }
>> +     initStringInfo(&buf);
>> +
>> +     for (i = 0; i < natts; ++i)
>> +     {
>> +             int32   bytes = pq_getmsgint(msg, 4);
>> +
>> +             if (bytes < 0)
>> +             {
>> +                     values[i] = 
>> ReceiveFunctionCall(&state->receive_functions[i],
>> +                                                                            
>>          NULL,
>> +                                                                            
>>          state->typioparams[i],
>> +                                                                            
>>          tupdesc->attrs[i]->atttypmod);
>> +                     isnull[i] = true;
>
>> +             }
>> +             else
>> +             {
>> +                     resetStringInfo(&buf);
>> +                     appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, 
>> bytes), bytes);
>> +                     values[i] = 
>> ReceiveFunctionCall(&state->receive_functions[i],
>> +                                                                            
>>          &buf,
>> +                                                                            
>>          state->typioparams[i],
>> +                                                                            
>>          tupdesc->attrs[i]->atttypmod);
>> +                     isnull[i] = false;
>> +             }
>> +     }
>
> Hm. I think you didn't check that the typemods are the same above.

The same as what?

>> +Datum
>> +pg_background_detach(PG_FUNCTION_ARGS)
>> +{
>> +     int32           pid = PG_GETARG_INT32(0);
>> +     pg_background_worker_info *info;
>> +
>> +     info = find_worker_info(pid);
>> +     if (info == NULL)
>> +             ereport(ERROR,
>> +                             (errcode(ERRCODE_UNDEFINED_OBJECT),
>> +                              errmsg("PID %d is not attached to this 
>> session", pid)));
>> +     dsm_detach(info->seg);
>> +
>> +     PG_RETURN_VOID();
>> +}
>
> So there 's really no limit of who is allowed to do stuff like
> this. I.e. any SECURITY DEFINER and such may do the same.

Do you think we need a restriction?  It's not obvious to me that there
are any security-relevant consequences to this, but it's an important
question, and I might be missing something.

>> +     /* Establish signal handlers. */
>> +     pqsignal(SIGTERM, handle_sigterm);
>
> Hm. No SIGINT?

Nope; bgworker.c sets it to StatementCancelHandler, which should be
fine.  Ideally I wouldn't have to do anything with SIGTERM either, but
bgworker.c sets it to bgworker_die(), which is pretty much complete
junk.  It's not safe to just ereport() from within whatever the heck
the caller is doing.  We should probably drop a small thermonuclear
weapon on bgworker_die(), but that's a problem for another patch.

>> +     /* Find data structures in dynamic shared memory. */
>> +     fdata = shm_toc_lookup(toc, PG_BACKGROUND_KEY_FIXED_DATA);
>> +     sql = shm_toc_lookup(toc, PG_BACKGROUND_KEY_SQL);
>> +     gucstate = shm_toc_lookup(toc, PG_BACKGROUND_KEY_GUC);
>> +     mq = shm_toc_lookup(toc, PG_BACKGROUND_KEY_QUEUE);
>> +     shm_mq_set_sender(mq, MyProc);
>> +     responseq = shm_mq_attach(mq, seg, NULL);
>
> Don't these need to ensure that values have been found? shm_toc_lookup
> returns NULL for unknown itmes and such and such?

Meh.  The launching process would have errored out if it hadn't been
able to set up the segment correctly.  We could add some Assert()
statements if you really feel strongly about it, but it seems fairly
pointless to me.  Any situation where those pointers come back NULL is
presumably going to be some sort of really stupid bug that will be
found even by trivial testing.

>> +     /* Restore GUC values from launching backend. */
>> +     StartTransactionCommand();
>> +     RestoreGUCState(gucstate);
>> +     CommitTransactionCommand();
>
> I haven't read the guc save patch, but is it a) required to this in a
> transaction? We normally reload the config even without. b) correct to
> do? What's with SET LOCAL variables?

(a) Yeah, it doesn't work without that.  I forget what breaks, but if
you taking those out, it will blow up.

(b) Do those need special handling for some reason?

> I doubt that actually works correctly without a SIGINT handler as
> statement timeout just falls back to kill(SIGINT)? Or does it, because
> it falls back to just doing a proc_exit()? If so, is that actually safe?

See above kvetching.

>> +     /* Post-execution cleanup. */
>> +     disable_timeout(STATEMENT_TIMEOUT, false);
>> +     CommitTransactionCommand();
>
> So, we're allowed to do nearly arbitrary nastyness here...

Can you be more specific about the nature of your concern?  This is no
different than finish_xact_command().

>> +     /*
>> +      * Parse the SQL string into a list of raw parse trees.
>> +      *
>> +      * Because we allow statements that perform internal transaction 
>> control,
>> +      * we can't do this in TopTransactionContext; the parse trees might get
>> +      * blown away before we're done executing them.
>> +      */
>> +     parsecontext = AllocSetContextCreate(TopMemoryContext,
>> +                                                                            
>>   "pg_background parse/plan",
>> +                                                                            
>>   ALLOCSET_DEFAULT_MINSIZE,
>> +                                                                            
>>   ALLOCSET_DEFAULT_INITSIZE,
>> +                                                                            
>>   ALLOCSET_DEFAULT_MAXSIZE);
>
> Not that it hugely matters, but shouldn't this rather be
> TopTransactionContext?

No, for the reasons explained in the command that you quoted right, uh, there.

>> +             bool            snapshot_set = false;
>> +             Portal          portal;
>> +             DestReceiver *receiver;
>> +             int16           format = 1;
>> +
>> +             /*
>> +              * We don't allow transaction-control commands like COMMIT and 
>> ABORT
>> +              * here.  The entire SQL statement is executed as a single 
>> transaction
>> +              * which commits if no errors are encountered.
>> +              */
>> +             if (IsA(parsetree, TransactionStmt))
>> +                     ereport(ERROR,
>> +                                     
>> (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
>> +                                      errmsg("transaction control 
>> statements are not allowed in pg_background")));
>
> Hm. I don't think that goes far enough. This allows commands that
> internally stop/start transactions like CREATE INDEX CONCURRETNLY. Are
> you sure that's working right now?

I tested VACUUM not of a specific table and that seems to work
correctly.  I could try CIC, but I think the issues are the same.  If
we only get one parse tree, then isTopLevel will be true and it's safe
for commands to do their own transaction control.  If we get multiple
parse trees, then PreventTransactionChain will do its thing.  This is
not novel territory.

> Hm. This is a fair amount of code copied from postgres.c.

Yes.  I'm open to suggestions, but I don't immediately see a better way.

> I think this is interesting work, but I doubt it's ready yet. I need to
> read the preceding patches, to really understand where breakage lies
> hidden.

Breakage??? In my code??? Surely not.  :-)

I'm reattaching all the uncommitted patches here.  #3 and #6 have been
updated; #2 and #4 are unchanged.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From d982dbb620b6a5d9992b3259550c3e16e7e8e28a Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Wed, 23 Jul 2014 11:16:44 -0400
Subject: [PATCH 2/6] Extend dsm API with a new function dsm_unkeep_mapping.

This reassociates a dynamic shared memory handle previous passed to
dsm_keep_mapping with the current resource owner, so that it will be
cleaned up at the end of the current query.
---
 src/backend/storage/ipc/dsm.c |   18 ++++++++++++++++++
 src/include/storage/dsm.h     |    1 +
 2 files changed, 19 insertions(+)

diff --git a/src/backend/storage/ipc/dsm.c b/src/backend/storage/ipc/dsm.c
index a5c0084..6039beb 100644
--- a/src/backend/storage/ipc/dsm.c
+++ b/src/backend/storage/ipc/dsm.c
@@ -796,6 +796,24 @@ dsm_keep_mapping(dsm_segment *seg)
 }
 
 /*
+ * Arrange to remove a dynamic shared memory mapping at cleanup time.
+ *
+ * dsm_keep_mapping() can be used to preserve a mapping for the entire
+ * lifetime of a process; this function reverses that decision, making
+ * the segment owned by the current resource owner.  This may be useful
+ * just before performing some operation that will invalidate the segment
+ * for future use by this backend.
+ */
+void
+dsm_unkeep_mapping(dsm_segment *seg)
+{
+	Assert(seg->resowner == NULL);
+	ResourceOwnerEnlargeDSMs(CurrentResourceOwner);
+	seg->resowner = CurrentResourceOwner;
+	ResourceOwnerRememberDSM(seg->resowner, seg);
+}
+
+/*
  * Keep a dynamic shared memory segment until postmaster shutdown.
  *
  * This function should not be called more than once per segment;
diff --git a/src/include/storage/dsm.h b/src/include/storage/dsm.h
index 1d0110d..1694409 100644
--- a/src/include/storage/dsm.h
+++ b/src/include/storage/dsm.h
@@ -37,6 +37,7 @@ extern void dsm_detach(dsm_segment *seg);
 
 /* Resource management functions. */
 extern void dsm_keep_mapping(dsm_segment *seg);
+extern void dsm_unkeep_mapping(dsm_segment *seg);
 extern void dsm_keep_segment(dsm_segment *seg);
 extern dsm_segment *dsm_find_mapping(dsm_handle h);
 
-- 
1.7.9.6 (Apple Git-31.1)

commit e23e994817526afb5de0106e5ea612894ae23fbd
Author: Robert Haas <rh...@postgresql.org>
Date:   Wed May 28 19:30:21 2014 -0400

    Support frontend-backend protocol communication using a shm_mq.
    
    A background worker can use pq_redirect_to_shm_mq() to direct protocol
    that would normally be sent to the frontend to a shm_mq so that another
    process may read them.
    
    The receiving process may use pq_parse_errornotice() to parse an
    ErrorResponse or NoticeResponse from the background worker and, if
    it wishes, ThrowErrorData() to propagate the error (with or without
    further modification).
    
    V2: Lots more hooks, and a struct!
    V3: Fix whitespace damage; rename send-methods to comm-methods.

diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile
index 8be0572..09410c4 100644
--- a/src/backend/libpq/Makefile
+++ b/src/backend/libpq/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 # be-fsstubs is here for historical reasons, probably belongs elsewhere
 
 OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
-       pqformat.o pqsignal.o
+       pqformat.o pqmq.o pqsignal.o
 
 ifeq ($(with_openssl),yes)
 OBJS += be-secure-openssl.o
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 605d891..dcbb704 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -102,7 +102,6 @@
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
-
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
 
@@ -134,16 +133,38 @@ static bool DoingCopyOut;
 
 
 /* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int	socket_flush(void);
+static int	socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int	socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
 static int	internal_putbytes(const char *s, size_t len);
 static int	internal_flush(void);
-static void pq_set_nonblocking(bool nonblocking);
+static void socket_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int	Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
 static int	Setup_AF_UNIX(char *sock_path);
 #endif   /* HAVE_UNIX_SOCKETS */
 
+PQcommMethods PQcommSocketMethods;
+
+static PQcommMethods PqCommSocketMethods = {
+	socket_comm_reset,
+	socket_flush,
+	socket_flush_if_writable,
+	socket_is_send_pending,
+	socket_putmessage,
+	socket_putmessage_noblock,
+	socket_startcopyout,
+	socket_endcopyout
+};
+
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -152,24 +173,25 @@ static int	Setup_AF_UNIX(char *sock_path);
 void
 pq_init(void)
 {
+	PqCommMethods = &PqCommSocketMethods;
 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	DoingCopyOut = false;
-	on_proc_exit(pq_close, 0);
+	on_proc_exit(socket_close, 0);
 }
 
 /* --------------------------------
- *		pq_comm_reset - reset libpq during error recovery
+ *		socket_comm_reset - reset libpq during error recovery
  *
  * This is called from error recovery at the outer idle loop.  It's
  * just to get us out of trouble if we somehow manage to elog() from
  * inside a pqcomm.c routine (which ideally will never happen, but...)
  * --------------------------------
  */
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
 {
 	/* Do not throw away pending data, but do reset the busy flag */
 	PqCommBusy = false;
@@ -178,14 +200,14 @@ pq_comm_reset(void)
 }
 
 /* --------------------------------
- *		pq_close - shutdown libpq at backend exit
+ *		socket_close - shutdown libpq at backend exit
  *
  * Note: in a standalone backend MyProcPort will be null,
  * don't crash during exit...
  * --------------------------------
  */
 static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
 {
 	if (MyProcPort != NULL)
 	{
@@ -783,15 +805,20 @@ TouchSocketFiles(void)
  */
 
 /* --------------------------------
- *			  pq_set_nonblocking - set socket blocking/non-blocking
+ *			  socket_set_nonblocking - set socket blocking/non-blocking
  *
  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
  * blocking otherwise.
  * --------------------------------
  */
 static void
-pq_set_nonblocking(bool nonblocking)
+socket_set_nonblocking(bool nonblocking)
 {
+	if (MyProcPort == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("there is no client connection")));
+
 	if (MyProcPort->noblock == nonblocking)
 		return;
 
@@ -844,7 +871,7 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	pq_set_nonblocking(false);
+	socket_set_nonblocking(false);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
@@ -935,7 +962,7 @@ pq_getbyte_if_available(unsigned char *c)
 	}
 
 	/* Put the socket into non-blocking mode */
-	pq_set_nonblocking(true);
+	socket_set_nonblocking(true);
 
 	r = secure_read(MyProcPort, c, 1);
 	if (r < 0)
@@ -1194,7 +1221,7 @@ internal_putbytes(const char *s, size_t len)
 		/* If buffer is full, then flush it out */
 		if (PqSendPointer >= PqSendBufferSize)
 		{
-			pq_set_nonblocking(false);
+			socket_set_nonblocking(false);
 			if (internal_flush())
 				return EOF;
 		}
@@ -1210,13 +1237,13 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *		pq_flush		- flush pending output
+ *		socket_flush		- flush pending output
  *
  *		returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
 {
 	int			res;
 
@@ -1224,7 +1251,7 @@ pq_flush(void)
 	if (PqCommBusy)
 		return 0;
 	PqCommBusy = true;
-	pq_set_nonblocking(false);
+	socket_set_nonblocking(false);
 	res = internal_flush();
 	PqCommBusy = false;
 	return res;
@@ -1310,8 +1337,8 @@ internal_flush(void)
  * Returns 0 if OK, or EOF if trouble.
  * --------------------------------
  */
-int
-pq_flush_if_writable(void)
+static int
+socket_flush_if_writable(void)
 {
 	int			res;
 
@@ -1324,7 +1351,7 @@ pq_flush_if_writable(void)
 		return 0;
 
 	/* Temporarily put the socket into non-blocking mode */
-	pq_set_nonblocking(true);
+	socket_set_nonblocking(true);
 
 	PqCommBusy = true;
 	res = internal_flush();
@@ -1333,11 +1360,11 @@ pq_flush_if_writable(void)
 }
 
 /* --------------------------------
- *		pq_is_send_pending	- is there any pending data in the output buffer?
+ *	socket_is_send_pending	- is there any pending data in the output buffer?
  * --------------------------------
  */
-bool
-pq_is_send_pending(void)
+static bool
+socket_is_send_pending(void)
 {
 	return (PqSendStart < PqSendPointer);
 }
@@ -1351,7 +1378,7 @@ pq_is_send_pending(void)
 
 
 /* --------------------------------
- *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
+ *		socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *		If msgtype is not '\0', it is a message type code to place before
  *		the message body.  If msgtype is '\0', then the message has no type
@@ -1375,8 +1402,8 @@ pq_is_send_pending(void)
  *		returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
 	if (DoingCopyOut || PqCommBusy)
 		return 0;
@@ -1408,8 +1435,8 @@ fail:
  *		If the output buffer is too small to hold the message, the buffer
  *		is enlarged.
  */
-void
-pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
 {
 	int res		PG_USED_FOR_ASSERTS_ONLY;
 	int			required;
@@ -1431,18 +1458,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len)
 
 
 /* --------------------------------
- *		pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *		socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *			is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
 	DoingCopyOut = true;
 }
 
 /* --------------------------------
- *		pq_endcopyout	- end an old-style COPY OUT transfer
+ *		socket_endcopyout	- end an old-style COPY OUT transfer
  *
  *		If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *		and must send a terminator line.  Since a partial data line might have
@@ -1451,8 +1478,8 @@ pq_startcopyout(void)
  *		not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
 	if (!DoingCopyOut)
 		return;
@@ -1462,7 +1489,6 @@ pq_endcopyout(bool errorAbort)
 	DoingCopyOut = false;
 }
 
-
 /*
  * Support for TCP Keepalive parameters
  */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
new file mode 100644
index 0000000..6e6b429
--- /dev/null
+++ b/src/backend/libpq/pqmq.c
@@ -0,0 +1,261 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static void mq_comm_reset(void);
+static int	mq_flush(void);
+static int	mq_flush_if_writable(void);
+static bool mq_is_send_pending(void);
+static int	mq_putmessage(char msgtype, const char *s, size_t len);
+static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void mq_startcopyout(void);
+static void mq_endcopyout(bool errorAbort);
+
+static PQcommMethods PqCommMqMethods = {
+	mq_comm_reset,
+	mq_flush,
+	mq_flush_if_writable,
+	mq_is_send_pending,
+	mq_putmessage,
+	mq_putmessage_noblock,
+	mq_startcopyout,
+	mq_endcopyout
+};
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+	PqCommMethods = &PqCommMqMethods;
+	pq_mq = mq;
+	pq_mq_handle = mqh;
+	whereToSendOutput = DestRemote;
+	FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+static void
+mq_comm_reset(void)
+{
+	/* Nothing to do. */
+}
+
+static int
+mq_flush(void)
+{
+	/* Nothing to do. */
+	return 0;
+}
+
+static int
+mq_flush_if_writable(void)
+{
+	/* Nothing to do. */
+	return 0;
+}
+
+static bool
+mq_is_send_pending(void)
+{
+	/* There's never anything pending. */
+	return 0;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle.  We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+mq_putmessage(char msgtype, const char *s, size_t len)
+{
+	shm_mq_iovec	iov[2];
+	shm_mq_result	result;
+
+	/*
+	 * If we're sending a message, and we have to wait because the
+	 * queue is full, and then we get interrupted, and that interrupt
+	 * results in trying to send another message, we respond by detaching
+	 * the queue.  There's no way to return to the original context, but
+	 * even if there were, just queueing the message would amount to
+	 * indefinitely postponing the response to the interrupt.  So we do
+	 * this instead.
+	 */
+	if (pq_mq_busy)
+	{
+		if (pq_mq != NULL)
+			shm_mq_detach(pq_mq);
+		pq_mq = NULL;
+		return EOF;
+	}
+
+	pq_mq_busy = true;
+
+	iov[0].data = &msgtype;
+	iov[0].len = 1;
+	iov[1].data = s;
+	iov[1].len = len;
+
+	Assert(pq_mq_handle != NULL);
+	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	pq_mq_busy = false;
+
+	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+	if (result != SHM_MQ_SUCCESS)
+		return EOF;
+	return 0;
+}
+
+static void
+mq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+	/*
+	 * While the shm_mq machinery does support sending a message in
+	 * non-blocking mode, there's currently no way to try sending beginning
+	 * to send the message that doesn't also commit us to completing the
+	 * transmission.  This could be improved in the future, but for now
+	 * we don't need it.
+	 */
+	elog(ERROR, "not currently supported");
+}
+
+static void
+mq_startcopyout(void)
+{
+	/* Nothing to do. */
+}
+
+static void
+mq_endcopyout(bool errorAbort)
+{
+	/* Nothing to do. */
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+	/* Initialize edata with reasonable defaults. */
+	MemSet(edata, 0, sizeof(ErrorData));
+	edata->elevel = ERROR;
+	edata->assoc_context = CurrentMemoryContext;
+
+	/* Loop over fields and extract each one. */
+	for (;;)
+	{
+		char	code = pq_getmsgbyte(msg);
+		const char *value;
+
+		if (code == '\0')
+		{
+			pq_getmsgend(msg);
+			break;
+		}
+		value = pq_getmsgstring(msg);
+
+		switch (code)
+		{
+			case PG_DIAG_SEVERITY:
+				if (strcmp(value, "DEBUG") == 0)
+					edata->elevel = DEBUG1;	/* or some other DEBUG level */
+				else if (strcmp(value, "LOG") == 0)
+					edata->elevel = LOG;	/* can't be COMMERROR */
+				else if (strcmp(value, "INFO") == 0)
+					edata->elevel = INFO;
+				else if (strcmp(value, "NOTICE") == 0)
+					edata->elevel = NOTICE;
+				else if (strcmp(value, "WARNING") == 0)
+					edata->elevel = WARNING;
+				else if (strcmp(value, "ERROR") == 0)
+					edata->elevel = ERROR;
+				else if (strcmp(value, "FATAL") == 0)
+					edata->elevel = FATAL;
+				else if (strcmp(value, "PANIC") == 0)
+					edata->elevel = PANIC;
+				else
+					elog(ERROR, "unknown error severity");
+				break;
+			case PG_DIAG_SQLSTATE:
+				if (strlen(value) != 5)
+					elog(ERROR, "malformed sql state");
+				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+												  value[3], value[4]);
+				break;
+			case PG_DIAG_MESSAGE_PRIMARY:
+				edata->message = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_DETAIL:
+				edata->detail = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_HINT:
+				edata->hint = pstrdup(value);
+				break;
+			case PG_DIAG_STATEMENT_POSITION:
+				edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_POSITION:
+				edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_QUERY:
+				edata->internalquery = pstrdup(value);
+				break;
+			case PG_DIAG_CONTEXT:
+				edata->context = pstrdup(value);
+				break;
+			case PG_DIAG_SCHEMA_NAME:
+				edata->schema_name = pstrdup(value);
+				break;
+			case PG_DIAG_TABLE_NAME:
+				edata->table_name = pstrdup(value);
+				break;
+			case PG_DIAG_COLUMN_NAME:
+				edata->column_name = pstrdup(value);
+				break;
+			case PG_DIAG_DATATYPE_NAME:
+				edata->datatype_name = pstrdup(value);
+				break;
+			case PG_DIAG_CONSTRAINT_NAME:
+				edata->constraint_name = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_FILE:
+				edata->filename = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_LINE:
+				edata->lineno = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_SOURCE_FUNCTION:
+				edata->funcname = pstrdup(value);
+				break;
+			default:
+				elog(ERROR, "unknown error field: %d", (int) code);
+				break;
+		}
+	}
+}
diff --git a/src/backend/utils/adt/numutils.c b/src/backend/utils/adt/numutils.c
index ca5a8a5..1d13363 100644
--- a/src/backend/utils/adt/numutils.c
+++ b/src/backend/utils/adt/numutils.c
@@ -34,7 +34,7 @@
  * overflow.
  */
 int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
 {
 	long		l;
 	char	   *badp;
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 32a9663..2316464 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1577,6 +1577,57 @@ FlushErrorState(void)
 }
 
 /*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+	ErrorData *newedata;
+	MemoryContext	oldcontext;
+
+	if (!errstart(edata->elevel, edata->filename, edata->lineno,
+				  edata->funcname, NULL))
+		return;
+
+	newedata = &errordata[errordata_stack_depth];
+	oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+	/* Copy the supplied fields to the error stack. */
+	if (edata->sqlerrcode > 0)
+		newedata->sqlerrcode = edata->sqlerrcode;
+	if (edata->message)
+		newedata->message = pstrdup(edata->message);
+	if (edata->detail)
+		newedata->detail = pstrdup(edata->detail);
+	if (edata->detail_log)
+		newedata->detail_log = pstrdup(edata->detail_log);
+	if (edata->hint)
+		newedata->hint = pstrdup(edata->hint);
+	if (edata->context)
+		newedata->context = pstrdup(edata->context);
+	if (edata->schema_name)
+		newedata->schema_name = pstrdup(edata->schema_name);
+	if (edata->table_name)
+		newedata->table_name = pstrdup(edata->table_name);
+	if (edata->column_name)
+		newedata->column_name = pstrdup(edata->column_name);
+	if (edata->datatype_name)
+		newedata->datatype_name = pstrdup(edata->datatype_name);
+	if (edata->constraint_name)
+		newedata->constraint_name = pstrdup(edata->constraint_name);
+	if (edata->internalquery)
+		newedata->internalquery = pstrdup(edata->internalquery);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	errfinish(0);
+}
+
+/*
  * ReThrowError --- re-throw a previously copied error
  *
  * A handler can do CopyErrorData/FlushErrorState to get out of the error
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 5da9d8d..409f3d7 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -37,6 +37,31 @@ typedef struct
 	}			u;
 } PQArgBlock;
 
+typedef struct
+{
+	void (*comm_reset)(void);
+	int	(*flush)(void);
+	int	(*flush_if_writable)(void);
+	bool (*is_send_pending)(void);
+	int	(*putmessage)(char msgtype, const char *s, size_t len);
+	void (*putmessage_noblock)(char msgtype, const char *s, size_t len);
+	void (*startcopyout)(void);
+	void (*endcopyout)(bool errorAbort);
+} PQcommMethods;
+
+PQcommMethods *PqCommMethods;
+
+#define pq_comm_reset()	(PqCommMethods->comm_reset())
+#define pq_flush() (PqCommMethods->flush())
+#define pq_flush_if_writable() (PqCommMethods->flush_if_writable())
+#define pq_is_send_pending() (PqCommMethods->is_send_pending())
+#define pq_putmessage(msgtype, s, len) \
+	(PqCommMethods->putmessage(msgtype, s, len))
+#define pq_putmessage_noblock(msgtype, s, len) \
+	(PqCommMethods->putmessage(msgtype, s, len))
+#define pq_startcopyout() (PqCommMethods->startcopyout())
+#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort))
+
 /*
  * External functions.
  */
@@ -51,7 +76,6 @@ extern int	StreamConnection(pgsocket server_fd, Port *port);
 extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void pq_init(void);
-extern void pq_comm_reset(void);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern int	pq_getmessage(StringInfo s, int maxlen);
@@ -59,13 +83,6 @@ extern int	pq_getbyte(void);
 extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
-extern int	pq_flush(void);
-extern int	pq_flush_if_writable(void);
-extern bool pq_is_send_pending(void);
-extern int	pq_putmessage(char msgtype, const char *s, size_t len);
-extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
-extern void pq_startcopyout(void);
-extern void pq_endcopyout(bool errorAbort);
 
 /*
  * prototypes for functions in be-secure.c
@@ -75,6 +92,9 @@ extern char *ssl_key_file;
 extern char *ssl_ca_file;
 extern char *ssl_crl_file;
 
+extern int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int  (*pq_flush_hook)(void);
+
 extern int	secure_initialize(void);
 extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
new file mode 100644
index 0000000..6bb24d9
--- /dev/null
+++ b/src/include/libpq/pqmq.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif   /* PQMQ_H */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index fb1b4a4..4e74d85 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -285,7 +285,7 @@ extern Datum current_schema(PG_FUNCTION_ARGS);
 extern Datum current_schemas(PG_FUNCTION_ARGS);
 
 /* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
 extern void pg_itoa(int16 i, char *a);
 extern void pg_ltoa(int32 l, char *a);
 extern void pg_lltoa(int64 ll, char *a);
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 92073be..87438b8 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -415,6 +415,7 @@ extern ErrorData *CopyErrorData(void);
 extern void FreeErrorData(ErrorData *edata);
 extern void FlushErrorState(void);
 extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
 extern void pg_re_throw(void) __attribute__((noreturn));
 
 extern char *GetErrorContextStack(void);
From 59d3cb93c566368d9cc9dd26b3ffd0924c98ee36 Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Thu, 17 Jul 2014 07:58:32 -0400
Subject: [PATCH 4/6] Add infrastructure to save and restore GUC values.

Amit Khandekar and Noah Misch, with a bit of further hacking by me.
---
 src/backend/utils/misc/guc.c |  397 ++++++++++++++++++++++++++++++++++++++++++
 src/include/utils/guc.h      |    5 +
 2 files changed, 402 insertions(+)

diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6c52db8..cb324f3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -117,6 +117,12 @@
 #define S_PER_D (60 * 60 * 24)
 #define MS_PER_D (1000 * 60 * 60 * 24)
 
+/*
+ * Precision with which REAL type guc values are to be printed for GUC
+ * serialization.
+ */
+#define REALTYPE_PRECISION 17
+
 /* XXX these should appear in other modules' header files */
 extern bool Log_disconnections;
 extern int	CommitDelay;
@@ -146,6 +152,10 @@ char	   *GUC_check_errmsg_string;
 char	   *GUC_check_errdetail_string;
 char	   *GUC_check_errhint_string;
 
+static void
+do_serialize(char **destptr, Size *maxbytes, const char *fmt,...)
+/* This lets gcc check the format string for consistency. */
+__attribute__((format(PG_PRINTF_ATTRIBUTE, 3, 4)));
 
 static void set_config_sourcefile(const char *name, char *sourcefile,
 					  int sourceline);
@@ -8409,6 +8419,393 @@ read_nondefault_variables(void)
 }
 #endif   /* EXEC_BACKEND */
 
+/*
+ * can_skip_gucvar:
+ * When serializing, determine whether to skip this GUC.  When restoring, the
+ * negation of this test determines whether to restore the compiled-in default
+ * value before processing serialized values.
+ *
+ * A PGC_S_DEFAULT setting on the serialize side will typically match new
+ * postmaster children, but that can be false when got_SIGHUP == true and the
+ * pending configuration change modifies this setting.  Nonetheless, we omit
+ * PGC_S_DEFAULT settings from serialization and make up for that by restoring
+ * defaults before applying serialized values.
+ *
+ * PGC_POSTMASTER variables always have the same value in every child of a
+ * particular postmaster.  Most PGC_INTERNAL variables are compile-time
+ * constants; a few, like server_encoding and lc_ctype, are handled specially
+ * outside the serialize/restore procedure.  Therefore, SerializeGUCState()
+ * never sends these, and and RestoreGUCState() never changes them.
+ */
+static bool
+can_skip_gucvar(struct config_generic * gconf)
+{
+	return gconf->context == PGC_POSTMASTER ||
+		gconf->context == PGC_INTERNAL || gconf->source == PGC_S_DEFAULT;
+}
+
+/*
+ * estimate_variable_size:
+ * Estimate max size for dumping the given GUC variable.
+ */
+static Size
+estimate_variable_size(struct config_generic * gconf)
+{
+	Size		size;
+	Size		valsize;
+
+	if (can_skip_gucvar(gconf))
+		return 0;
+
+	size = 0;
+
+	size = add_size(size, strlen(gconf->name) + 1);
+
+	/* Get the maximum display length of the GUC value. */
+	switch (gconf->vartype)
+	{
+		case PGC_BOOL:
+			{
+				valsize = 5;	/* max(strlen('true'), strlen('false')) */
+			}
+			break;
+
+		case PGC_INT:
+			{
+				struct config_int *conf = (struct config_int *) gconf;
+
+				/*
+				 * Instead of getting the exact display length, use max
+				 * length.  Also reduce the max length for typical ranges of
+				 * small values.  Maximum value is 2147483647, i.e. 10 chars.
+				 * Include one byte for sign.
+				 */
+				if (abs(*conf->variable) < 1000)
+					valsize = 3 + 1;
+				else
+					valsize = 10 + 1;
+			}
+			break;
+
+		case PGC_REAL:
+			{
+				/*
+				 * We are going to print it with %.17g. Account for sign,
+				 * decimal point, and e+nnn notation. E.g.
+				 * -3.9932904234000002e+110
+				 */
+				valsize = REALTYPE_PRECISION + 1 + 1 + 5;
+			}
+			break;
+
+		case PGC_STRING:
+			{
+				struct config_string *conf = (struct config_string *) gconf;
+
+				valsize = strlen(*conf->variable);
+			}
+			break;
+
+		case PGC_ENUM:
+			{
+				struct config_enum *conf = (struct config_enum *) gconf;
+
+				valsize = strlen(config_enum_lookup_by_value(conf, *conf->variable));
+			}
+			break;
+	}
+
+	/* Valsize + NULL terminator */
+	size = add_size(size, valsize + 1);
+
+	if (gconf->sourcefile)
+		size = add_size(size, strlen(gconf->sourcefile));
+
+	/* NULL char */
+	size = add_size(size, 1);
+
+	/* Include line whenever we include file. */
+	if (gconf->sourcefile)
+		size = add_size(size, sizeof(gconf->sourceline));
+
+	size = add_size(size, sizeof(gconf->source));
+	size = add_size(size, sizeof(gconf->scontext));
+
+	return size;
+}
+
+/*
+ * EstimateGUCStateSpace:
+ * Returns the size needed to store the GUC state for the current process
+ */
+Size
+EstimateGUCStateSpace(void)
+{
+	Size		size;
+	int			i;
+
+	/* Add space reqd for saving the data size of the guc state */
+	size = sizeof(Size);
+
+	/* Add up the space needed for each GUC variable */
+	for (i = 0; i < num_guc_variables; i++)
+		size = add_size(size,
+						estimate_variable_size(guc_variables[i]));
+
+	return size;
+}
+
+/*
+ * do_serialize:
+ * Copies the formatted string into the destination.  Moves ahead the
+ * destination pointer, and decrements the maxbytes by that many bytes. If
+ * maxbytes is not sufficient to copy the string, error out.
+ */
+static void
+do_serialize(char **destptr, Size *maxbytes, const char *fmt,...)
+{
+	va_list		vargs;
+	int			n;
+
+	if (*maxbytes <= 0)
+		elog(ERROR, "not enough space to serialize GUC state");
+
+	va_start(vargs, fmt);
+	n = vsnprintf(*destptr, *maxbytes, fmt, vargs);
+	va_end(vargs);
+
+	/*
+	 * Cater to portability hazards in the vsnprintf() return value just like
+	 * appendPQExpBufferVA() does.  Note that this requires an extra byte of
+	 * slack at the end of the buffer.  Since serialize_variable() ends with a
+	 * do_serialize_binary() rather than a do_serialize(), we'll always have
+	 * that slack; estimate_variable_size() need not add a byte for it.
+	 */
+	if (n < 0 || n >= *maxbytes - 1)
+	{
+		if (n < 0 && errno != 0 && errno != ENOMEM)
+			/* Shouldn't happen. Better show errno description. */
+			elog(ERROR, "vsnprintf failed: %m");
+		else
+			elog(ERROR, "not enough space to serialize GUC state");
+	}
+
+	/* Shift the destptr ahead of the null terminator */
+	*destptr += n + 1;
+	*maxbytes -= n + 1;
+}
+
+/* Binary copy version of do_serialize() */
+static void
+do_serialize_binary(char **destptr, Size *maxbytes, void *val, Size valsize)
+{
+	if (valsize > *maxbytes)
+		elog(ERROR, "not enough space to serialize GUC state");
+
+	memcpy(*destptr, val, valsize);
+	*destptr += valsize;
+	*maxbytes -= valsize;
+}
+
+/*
+ * serialize_variable:
+ * Dumps name, value and other information of a GUC variable into destptr.
+ */
+static void
+serialize_variable(char **destptr, Size *maxbytes,
+				   struct config_generic * gconf)
+{
+	if (can_skip_gucvar(gconf))
+		return;
+
+	do_serialize(destptr, maxbytes, "%s", gconf->name);
+
+	switch (gconf->vartype)
+	{
+		case PGC_BOOL:
+			{
+				struct config_bool *conf = (struct config_bool *) gconf;
+
+				do_serialize(destptr, maxbytes,
+							 (*conf->variable ? "true" : "false"));
+			}
+			break;
+
+		case PGC_INT:
+			{
+				struct config_int *conf = (struct config_int *) gconf;
+
+				do_serialize(destptr, maxbytes, "%d", *conf->variable);
+			}
+			break;
+
+		case PGC_REAL:
+			{
+				struct config_real *conf = (struct config_real *) gconf;
+
+				do_serialize(destptr, maxbytes, "%.*g",
+							 REALTYPE_PRECISION, *conf->variable);
+			}
+			break;
+
+		case PGC_STRING:
+			{
+				struct config_string *conf = (struct config_string *) gconf;
+
+				do_serialize(destptr, maxbytes, "%s", *conf->variable);
+			}
+			break;
+
+		case PGC_ENUM:
+			{
+				struct config_enum *conf = (struct config_enum *) gconf;
+
+				do_serialize(destptr, maxbytes, "%s",
+						 config_enum_lookup_by_value(conf, *conf->variable));
+			}
+			break;
+	}
+
+	do_serialize(destptr, maxbytes, "%s",
+				 (gconf->sourcefile ? gconf->sourcefile : ""));
+
+	if (gconf->sourcefile)
+		do_serialize_binary(destptr, maxbytes, &gconf->sourceline,
+							sizeof(gconf->sourceline));
+
+	do_serialize_binary(destptr, maxbytes, &gconf->source,
+						sizeof(gconf->source));
+	do_serialize_binary(destptr, maxbytes, &gconf->scontext,
+						sizeof(gconf->scontext));
+}
+
+/*
+ * SerializeGUCState:
+ * Dumps the complete GUC state onto the memory location at start_address.
+ */
+void
+SerializeGUCState(Size maxsize, char *start_address)
+{
+	char	   *curptr;
+	Size		actual_size;
+	Size		bytes_left;
+	int			i;
+	int			i_role;
+
+	/* Reserve space for saving the actual size of the guc state */
+	curptr = start_address + sizeof(actual_size);
+	bytes_left = maxsize - sizeof(actual_size);
+
+	for (i = 0; i < num_guc_variables; i++)
+	{
+		/*
+		 * It's pretty ugly, but we've got to force "role" to be initialized
+		 * after "session_authorization"; otherwise, the latter will override
+		 * the former.
+		 */
+		if (strcmp(guc_variables[i]->name, "role") == 0)
+			i_role = i;
+		else
+			serialize_variable(&curptr, &bytes_left, guc_variables[i]);
+	}
+	serialize_variable(&curptr, &bytes_left, guc_variables[i_role]);
+
+	/* Store actual size without assuming alignment of start_address. */
+	actual_size = maxsize - bytes_left - sizeof(actual_size);
+	memcpy(start_address, &actual_size, sizeof(actual_size));
+}
+
+/*
+ * read_gucstate:
+ * Actually it does not read anything, just returns the srcptr. But it does
+ * move the srcptr past the terminating NULL char, so that the caller is ready
+ * to read the next string.
+ */
+static char *
+read_gucstate(char **srcptr, char *srcend)
+{
+	char	   *retptr = *srcptr;
+	char	   *ptr;
+
+	if (*srcptr >= srcend)
+		elog(ERROR, "incomplete GUC state");
+
+	/* The string variables are all null terminated */
+	for (ptr = *srcptr; ptr < srcend && *ptr != '\0'; ptr++)
+		;
+
+	if (ptr > srcend)
+		elog(ERROR, "could not find null terminator in GUC state");
+
+	/* Set the new position at the position after the NULL character */
+	*srcptr = ptr + 1;
+
+	return retptr;
+}
+
+/* Binary read version of read_gucstate(). Copies into dest */
+static void
+read_gucstate_binary(char **srcptr, char *srcend, void *dest, Size size)
+{
+	if (*srcptr + size > srcend)
+		elog(ERROR, "incomplete GUC state");
+
+	memcpy(dest, *srcptr, size);
+	*srcptr += size;
+}
+
+/*
+ * RestoreGUCState:
+ * Reads the GUC state at the specified address and updates the GUCs with the
+ * values read from the GUC state.
+ */
+void
+RestoreGUCState(void *gucstate)
+{
+	char	   *varname,
+			   *varvalue,
+			   *varsourcefile;
+	int			varsourceline;
+	GucSource	varsource;
+	GucContext	varscontext;
+	char	   *srcptr = (char *) gucstate;
+	char	   *srcend;
+	Size		len;
+	int			i;
+
+	/* See comment at can_skip_gucvar(). */
+	for (i = 0; i < num_guc_variables; i++)
+		if (!can_skip_gucvar(guc_variables[i]))
+			InitializeOneGUCOption(guc_variables[i]);
+
+	/* First item is the length of the subsequent data */
+	memcpy(&len, gucstate, sizeof(len));
+
+	srcptr += sizeof(len);
+	srcend = srcptr + len;
+
+	while (srcptr < srcend)
+	{
+		if ((varname = read_gucstate(&srcptr, srcend)) == NULL)
+			break;
+
+		varvalue = read_gucstate(&srcptr, srcend);
+		varsourcefile = read_gucstate(&srcptr, srcend);
+		if (varsourcefile[0])
+			read_gucstate_binary(&srcptr, srcend,
+								 &varsourceline, sizeof(varsourceline));
+		read_gucstate_binary(&srcptr, srcend,
+							 &varsource, sizeof(varsource));
+		read_gucstate_binary(&srcptr, srcend,
+							 &varscontext, sizeof(varscontext));
+
+		(void) set_config_option(varname, varvalue,
+								 varscontext, varsource,
+								 GUC_ACTION_SET, true, 0);
+		if (varsourcefile[0])
+			set_config_sourcefile(varname, varsourcefile, varsourceline);
+	}
+}
 
 /*
  * A little "long argument" simulation, although not quite GNU
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 0a729c1..47cac5c 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -351,6 +351,11 @@ extern void write_nondefault_variables(GucContext context);
 extern void read_nondefault_variables(void);
 #endif
 
+/* GUC serialization */
+extern Size EstimateGUCStateSpace(void);
+extern void SerializeGUCState(Size maxsize, char *start_address);
+extern void RestoreGUCState(void *gucstate);
+
 /* Support for messages reported from GUC check hooks */
 
 extern PGDLLIMPORT char *GUC_check_errmsg_string;
-- 
1.7.9.6 (Apple Git-31.1)

commit 5d550ec6627d25932bebaedf589e6928b6f21ea1
Author: Robert Haas <rh...@postgresql.org>
Date:   Fri Jul 11 09:53:40 2014 -0400

    pg_background: Run commands in a background worker, and get the results.
    
    The currently-active GUC values from the user session will be copied
    to the background worker.  If the command returns a result set, you
    can retrieve the result set; if not, you can retrieve the command
    tags.  If the command fails with an error, the same error will be
    thrown in the launching process when the results are retrieved.
    Warnings and other messages generated by the background worker, and
    notifications received by it, are also propagated to the foreground
    process.
    
    Patch by me; review by Amit Kapila and Andres Freund.
    
    V2: Refactor to reduce differences with exec_simple_query; other
    cleanups per Amit.
    
    V3: Per Andres, fix whitespace damage and use NameData instead of
    char[NAMEDATALEN].

diff --git a/contrib/Makefile b/contrib/Makefile
index b37d0dd..11d6116 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -30,6 +30,7 @@ SUBDIRS = \
 		pageinspect	\
 		passwordcheck	\
 		pg_archivecleanup \
+		pg_background \
 		pg_buffercache	\
 		pg_freespacemap \
 		pg_prewarm	\
diff --git a/contrib/pg_background/Makefile b/contrib/pg_background/Makefile
new file mode 100644
index 0000000..c4e717d
--- /dev/null
+++ b/contrib/pg_background/Makefile
@@ -0,0 +1,18 @@
+# contrib/pg_background/Makefile
+
+MODULE_big = pg_background
+OBJS = pg_background.o
+
+EXTENSION = pg_background
+DATA = pg_background--1.0.sql
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_background
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_background/pg_background--1.0.sql b/contrib/pg_background/pg_background--1.0.sql
new file mode 100644
index 0000000..5fa5ddb
--- /dev/null
+++ b/contrib/pg_background/pg_background--1.0.sql
@@ -0,0 +1,17 @@
+/* contrib/pg_background/pg_background--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION pg_background" to load this file. \quit
+
+CREATE FUNCTION pg_background_launch(sql pg_catalog.text,
+					   queue_size pg_catalog.int4 DEFAULT 65536)
+    RETURNS pg_catalog.int4 STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_result(pid pg_catalog.int4)
+    RETURNS SETOF pg_catalog.record STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION pg_background_detach(pid pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/pg_background/pg_background.c b/contrib/pg_background/pg_background.c
new file mode 100644
index 0000000..f807781
--- /dev/null
+++ b/contrib/pg_background/pg_background.c
@@ -0,0 +1,935 @@
+/*--------------------------------------------------------------------------
+ *
+ * pg_background.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/pg_background/pg_background.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+
+#include "access/htup_details.h"
+#include "access/printtup.h"
+#include "access/xact.h"
+#include "catalog/pg_type.h"
+#include "commands/async.h"
+#include "commands/dbcommands.h"
+#include "funcapi.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "parser/analyze.h"
+#include "pgstat.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/pquery.h"
+#include "tcop/utility.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/snapmgr.h"
+#include "utils/timeout.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define PG_BACKGROUND_MAGIC				0x50674267
+#define PG_BACKGROUND_KEY_FIXED_DATA	0
+#define PG_BACKGROUND_KEY_SQL			1
+#define PG_BACKGROUND_KEY_GUC			2
+#define PG_BACKGROUND_KEY_QUEUE			3
+#define PG_BACKGROUND_NKEYS				4
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct pg_background_fixed_data
+{
+	Oid	database_id;
+	Oid	authenticated_user_id;
+	Oid	current_user_id;
+	int	sec_context;
+	NameData	database;
+	NameData	authenticated_user;
+} pg_background_fixed_data;
+
+/* Private state maintained by the launching backend for IPC. */
+typedef struct pg_background_worker_info
+{
+	pid_t		pid;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *handle;
+	shm_mq_handle *responseq;
+	bool		consumed;
+} pg_background_worker_info;
+
+/* Private state maintained across calls to pg_background_result. */
+typedef struct pg_background_result_state
+{
+	pg_background_worker_info *info;
+	FmgrInfo   *receive_functions;
+	Oid		   *typioparams;
+	bool		has_row_description;
+	List	   *command_tags;
+	bool		complete;
+} pg_background_result_state;
+
+static HTAB *worker_hash;
+
+static void cleanup_worker_info(dsm_segment *, Datum pid_datum);
+static pg_background_worker_info *find_worker_info(pid_t pid);
+static void save_worker_info(pid_t pid, dsm_segment *seg,
+				 BackgroundWorkerHandle *handle,
+				 shm_mq_handle *responseq);
+
+static HeapTuple form_result_tuple(pg_background_result_state *state,
+								   TupleDesc tupdesc, StringInfo msg);
+
+static void handle_sigterm(SIGNAL_ARGS);
+static void execute_sql_string(const char *sql);
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(pg_background_launch);
+PG_FUNCTION_INFO_V1(pg_background_result);
+PG_FUNCTION_INFO_V1(pg_background_detach);
+
+void pg_background_worker_main(Datum);
+
+/*
+ * Start a dynamic background worker to run a user-specified SQL command.
+ */
+Datum
+pg_background_launch(PG_FUNCTION_ARGS)
+{
+	text	   *sql = PG_GETARG_TEXT_PP(0);
+	int32		queue_size = PG_GETARG_INT32(1);
+	int32		sql_len = VARSIZE_ANY_EXHDR(sql);
+	Size		guc_len;
+	Size		segsize;
+	dsm_segment *seg;
+	shm_toc_estimator e;
+	shm_toc    *toc;
+	char	   *sqlp;
+	char	   *gucstate;
+	shm_mq	   *mq;
+	BackgroundWorker worker;
+	BackgroundWorkerHandle *worker_handle;
+	pg_background_fixed_data *fdata;
+	pid_t		pid;
+	shm_mq_handle *responseq;
+	MemoryContext	oldcontext;
+
+	/* Ensure a valid queue size. */
+	if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size must be at least %zu bytes",
+						shm_mq_minimum_size)));
+
+	/* Create dynamic shared memory and table of contents. */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(pg_background_fixed_data));
+	shm_toc_estimate_chunk(&e, sql_len + 1);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_chunk(&e, (Size) queue_size);
+	shm_toc_estimate_keys(&e, PG_BACKGROUND_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	seg = dsm_create(segsize);
+	toc = shm_toc_create(PG_BACKGROUND_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(pg_background_fixed_data));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	namestrcpy(&fdata->database, get_database_name(MyDatabaseId));
+	namestrcpy(&fdata->authenticated_user,
+			   GetUserNameFromId(fdata->authenticated_user_id));
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_FIXED_DATA, fdata);
+
+	/* Store SQL query in dynamic shared memory. */
+	sqlp = shm_toc_allocate(toc, sql_len + 1);
+	memcpy(sqlp, VARDATA(sql), sql_len);
+	sqlp[sql_len] = '\0';
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_SQL, sqlp);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_GUC, gucstate);
+
+	/* Establish message queue in dynamic shared memory. */
+	mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+					   (Size) queue_size);
+	shm_toc_insert(toc, PG_BACKGROUND_KEY_QUEUE, mq);
+	shm_mq_set_receiver(mq, MyProc);
+
+	/*
+	 * Attach the queue before launching a worker, so that we'll automatically
+	 * detach the queue if we error out.  (Otherwise, the worker might sit
+	 * there trying to write the queue long after we've gone away.)
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	responseq = shm_mq_attach(mq, seg, NULL);
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Configure a worker. */
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = NULL;		/* new worker might not have library loaded */
+	sprintf(worker.bgw_library_name, "pg_background");
+	sprintf(worker.bgw_function_name, "pg_background_worker_main");
+	snprintf(worker.bgw_name, BGW_MAXLEN,
+			 "pg_background by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	/* set bgw_notify_pid, so we can detect if the worker stops */
+	worker.bgw_notify_pid = MyProcPid;
+
+	/*
+	 * Register the worker.
+	 *
+	 * We switch contexts so that the background worker handle can outlast
+	 * this transaction.
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	if (!RegisterDynamicBackgroundWorker(&worker, &worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+			 errhint("You may need to increase max_worker_processes.")));
+	MemoryContextSwitchTo(oldcontext);
+	shm_mq_set_handle(responseq, worker_handle);
+
+	/* Wait for the worker to start. */
+	switch (WaitForBackgroundWorkerStartup(worker_handle, &pid))
+	{
+		case BGWH_STARTED:
+			/* Success. */
+			break;
+		case BGWH_STOPPED:
+			pfree(worker_handle);
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("could not start background process"),
+					 errhint("More details may be available in the server log.")));
+			break;
+		case BGWH_POSTMASTER_DIED:
+			pfree(worker_handle);
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					 errmsg("cannot start background processes without postmaster"),
+					 errhint("Kill all remaining database processes and restart the database.")));
+			break;
+		default:
+			elog(ERROR, "unexpected bgworker handle status");
+			break;
+	}
+
+	/* Store the relevant details about this worker for future use. */
+	save_worker_info(pid, seg, worker_handle, responseq);
+
+	/*
+	 * Now that the worker info is saved, we do not need to, and should not,
+	 * automatically detach the segment at resource-owner cleanup time.
+	 */
+	dsm_keep_mapping(seg);
+
+	/* Return the worker's PID. */
+	PG_RETURN_INT32(pid);
+}
+
+/*
+ * Retrieve the results of a background query previously launched in this
+ * session.
+ */
+Datum
+pg_background_result(PG_FUNCTION_ARGS)
+{
+	int32		pid = PG_GETARG_INT32(0);
+	shm_mq_result	res;
+	FuncCallContext *funcctx;
+	TupleDesc	tupdesc;
+	StringInfoData	msg;
+	pg_background_result_state *state;
+
+	/* First-time setup. */
+	if (SRF_IS_FIRSTCALL())
+	{
+		MemoryContext	oldcontext;
+		pg_background_worker_info *info;
+
+		funcctx = SRF_FIRSTCALL_INIT();
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		/* See if we have a connection to the specified PID. */
+		if ((info = find_worker_info(pid)) == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("PID %d is not attached to this session", pid)));
+
+		/* Can't read results twice. */
+		if (info->consumed)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("results for PID %d have already been consumed", pid)));
+		info->consumed = true;
+
+		/*
+		 * Whether we succeed or fail, a future invocation of this function
+		 * may not try to read from the DSM once we've begun to do so.
+		 * Accordingly, make arrangements to clean things up at end of query.
+		 */
+		dsm_unkeep_mapping(info->seg);
+
+		/* Set up tuple-descriptor based on colum definition list. */
+		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record"),
+					 errhint("Try calling the function in the FROM clause "
+							 "using a column definition list.")));
+		funcctx->tuple_desc = BlessTupleDesc(tupdesc);
+
+		/* Cache state that will be needed on every call. */
+		state = palloc0(sizeof(pg_background_result_state));
+		state->info = info;
+		if (funcctx->tuple_desc->natts > 0)
+		{
+			int	natts = funcctx->tuple_desc->natts;
+			int	i;
+
+			state->receive_functions = palloc(sizeof(FmgrInfo) * natts);
+			state->typioparams = palloc(sizeof(Oid) * natts);
+
+			for (i = 0;	i < natts; ++i)
+			{
+				Oid	receive_function_id;
+
+				getTypeBinaryInputInfo(funcctx->tuple_desc->attrs[i]->atttypid,
+									   &receive_function_id,
+									   &state->typioparams[i]);
+				fmgr_info(receive_function_id, &state->receive_functions[i]);
+			}
+		}
+		funcctx->user_fctx = state;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	funcctx = SRF_PERCALL_SETUP();
+	tupdesc = funcctx->tuple_desc;
+	state = funcctx->user_fctx;
+
+	/* Initialize message buffer. */
+	initStringInfo(&msg);
+
+	/* Read and processes messages from the shared memory queue. */
+	for (;;)
+	{
+		char		msgtype;
+		Size		nbytes;
+		void	   *data;
+
+		/* Get next message. */
+		res = shm_mq_receive(state->info->responseq, &nbytes, &data, false);
+		if (res != SHM_MQ_SUCCESS)
+			break;
+
+		/*
+		 * Message-parsing routines operate on a null-terminated StringInfo,
+		 * so we must construct one.
+		 */
+		resetStringInfo(&msg);
+		enlargeStringInfo(&msg, nbytes);
+		msg.len = nbytes;
+		memcpy(msg.data, data, nbytes);
+		msg.data[nbytes] = '\0';
+		msgtype = pq_getmsgbyte(&msg);
+
+		/* Dispatch on message type. */
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				{
+					ErrorData	edata;
+
+					/* Parse ErrorResponse or NoticeResponse. */
+					pq_parse_errornotice(&msg, &edata);
+
+					/*
+					 * Limit the maximum error level to ERROR.  We don't want
+					 * a FATAL inside the background worker to kill the user
+					 * session.
+					 */
+					if (edata.elevel > ERROR)
+						edata.elevel = ERROR;
+
+					/* Rethrow the error. */
+					ThrowErrorData(&edata);
+					break;
+				}
+			case 'A':
+				{
+					/* Propagate NotifyResponse. */
+					pq_putmessage(msg.data[0], &msg.data[1], nbytes - 1);
+					break;
+				}
+			case 'T':
+				{
+					int16	natts = pq_getmsgint(&msg, 2);
+					int16	i;
+
+					if (state->has_row_description)
+						elog(ERROR, "multiple RowDescription messages");
+					state->has_row_description = true;
+					if (natts != tupdesc->natts)
+						ereport(ERROR,
+								(errcode(ERRCODE_DATATYPE_MISMATCH),
+								 errmsg("remote query result rowtype does not match "
+									"the specified FROM clause rowtype")));
+
+					for (i = 0; i < natts; ++i)
+					{
+						Oid		type_id;
+
+						(void) pq_getmsgstring(&msg);	/* name */
+						(void) pq_getmsgint(&msg, 4);	/* table OID */
+						(void) pq_getmsgint(&msg, 2);	/* table attnum */
+						type_id = pq_getmsgint(&msg, 4);	/* type OID */
+						(void) pq_getmsgint(&msg, 2);	/* type length */
+						(void) pq_getmsgint(&msg, 4);	/* typmod */
+						(void) pq_getmsgint(&msg, 2);	/* format code */
+
+						if (type_id != tupdesc->attrs[i]->atttypid)
+							ereport(ERROR,
+									(errcode(ERRCODE_DATATYPE_MISMATCH),
+									 errmsg("remote query result rowtype does not match "
+										"the specified FROM clause rowtype")));
+					}
+
+					pq_getmsgend(&msg);
+
+					break;
+				}
+			case 'D':
+				{
+					/* Handle DataRow message. */
+					HeapTuple	result;
+
+					result = form_result_tuple(state, tupdesc, &msg);
+					SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+				}
+			case 'C':
+				{
+					/* Handle CommandComplete message. */
+					MemoryContext	oldcontext;
+					const char  *tag = pq_getmsgstring(&msg);
+
+					oldcontext =
+						MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+					state->command_tags = lappend(state->command_tags,
+												  pstrdup(tag));
+					MemoryContextSwitchTo(oldcontext);
+					break;
+				}
+			case 'G':
+			case 'H':
+			case 'W':
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("COPY protocol not allowed in pg_background")));
+				}
+
+			case 'Z':
+				{
+					/* Handle ReadyForQuery message. */
+					state->complete = true;
+					break;
+				}
+			default:
+				elog(WARNING, "unknown message type: %c (%zu bytes)",
+					 msg.data[0], nbytes);
+				break;
+		}
+	}
+
+	/* Check whether the connection was broken prematurely. */
+	if (!state->complete)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("lost connection to worker process with PID %d",
+					pid)));
+
+	/* If no data rows, return the command tags instead. */
+	if (!state->has_row_description)
+	{
+		if (tupdesc->natts != 1 || tupdesc->attrs[0]->atttypid != TEXTOID)
+			ereport(ERROR,
+					(errcode(ERRCODE_DATATYPE_MISMATCH),
+					 errmsg("remote query did not return a result set, but "
+							"result rowtype is not a single text column")));
+		if (state->command_tags != NIL)
+		{
+			char *tag = linitial(state->command_tags);
+			Datum	value;
+			bool	isnull;
+			HeapTuple	result;
+
+			state->command_tags = list_delete_first(state->command_tags);
+			value = PointerGetDatum(cstring_to_text(tag));
+			isnull = false;
+			result = heap_form_tuple(tupdesc, &value, &isnull);
+			SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(result));
+		}
+	}
+
+	/* We're done! */
+	dsm_detach(state->info->seg);
+	SRF_RETURN_DONE(funcctx);
+}
+
+/*
+ * Parse a DataRow message and form a result tuple.
+ */
+static HeapTuple
+form_result_tuple(pg_background_result_state *state, TupleDesc tupdesc,
+				  StringInfo msg)
+{
+	/* Handle DataRow message. */
+	int16	natts = pq_getmsgint(msg, 2);
+	int16	i;
+	Datum  *values = NULL;
+	bool   *isnull = NULL;
+	StringInfoData	buf;
+
+	if (!state->has_row_description)
+		elog(ERROR, "DataRow not preceded by RowDescription");
+	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+	if (natts > 0)
+	{
+		values = palloc(natts * sizeof(Datum));
+		isnull = palloc(natts * sizeof(bool));
+	}
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; ++i)
+	{
+		int32	bytes = pq_getmsgint(msg, 4);
+
+		if (bytes < 0)
+		{
+			values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+											NULL,
+											state->typioparams[i],
+											tupdesc->attrs[i]->atttypmod);
+			isnull[i] = true;
+		}
+		else
+		{
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, bytes), bytes);
+			values[i] = ReceiveFunctionCall(&state->receive_functions[i],
+											&buf,
+											state->typioparams[i],
+											tupdesc->attrs[i]->atttypmod);
+			isnull[i] = false;
+		}
+	}
+
+	pq_getmsgend(msg);
+
+	return heap_form_tuple(tupdesc, values, isnull);
+}
+
+/*
+ * Detach from the dynamic shared memory segment used for communication with
+ * a background worker.  This prevents the worker from stalling waiting for
+ * us to read its results.
+ */
+Datum
+pg_background_detach(PG_FUNCTION_ARGS)
+{
+	int32		pid = PG_GETARG_INT32(0);
+	pg_background_worker_info *info;
+
+	info = find_worker_info(pid);
+	if (info == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_UNDEFINED_OBJECT),
+				 errmsg("PID %d is not attached to this session", pid)));
+	dsm_detach(info->seg);
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * When the dynamic shared memory segment associated with a worker is
+ * cleaned up, we need to clean up our associated private data structures.
+ */
+static void
+cleanup_worker_info(dsm_segment *seg, Datum pid_datum)
+{
+	pid_t	pid = DatumGetInt32(pid_datum);
+	bool	found;
+	pg_background_worker_info *info;
+
+	/* Find any worker info entry for this PID.  If none, we're done. */
+	if ((info = find_worker_info(pid)) == NULL)
+		return;
+
+	/* Free memory used by the BackgroundWorkerHandle. */
+	if (info->handle != NULL)
+	{
+		pfree(info->handle);
+		info->handle = NULL;
+	}
+
+	/* Remove the hashtable entry. */
+	hash_search(worker_hash, (void *) &pid, HASH_REMOVE, &found);
+	if (!found)
+		elog(ERROR, "pg_background worker_hash table corrupted");
+}
+
+/*
+ * Find the background worker information for the worker with a given PID.
+ */
+static pg_background_worker_info *
+find_worker_info(pid_t pid)
+{
+	pg_background_worker_info *info = NULL;
+
+	if (worker_hash != NULL)
+		info = hash_search(worker_hash, (void *) &pid, HASH_FIND, NULL);
+
+	return info;
+}
+
+/*
+ * Save worker information for future IPC.
+ */
+static void
+save_worker_info(pid_t pid, dsm_segment *seg, BackgroundWorkerHandle *handle,
+				 shm_mq_handle *responseq)
+{
+	pg_background_worker_info *info;
+
+	/* If the hash table hasn't been set up yet, do that now. */
+	if (worker_hash == NULL)
+	{
+		HASHCTL	ctl;
+
+		ctl.keysize = sizeof(pid_t);
+		ctl.entrysize = sizeof(pg_background_worker_info);
+		worker_hash = hash_create("pg_background worker_hash", 8, &ctl,
+								  HASH_ELEM);
+	}
+
+	/* Detach any older worker with this PID. */
+	if ((info = find_worker_info(pid)) != NULL)
+		dsm_detach(info->seg);
+
+	/* When the DSM is unmapped, clean everything up. */
+	on_dsm_detach(seg, cleanup_worker_info, Int32GetDatum(pid));
+
+	/* Create a new entry for this worker. */
+	info = hash_search(worker_hash, (void *) &pid, HASH_ENTER, NULL);
+	info->seg = seg;
+	info->handle = handle;
+	info->responseq = responseq;
+	info->consumed = false;
+}
+
+/*
+ * Background worker entrypoint.
+ */
+void
+pg_background_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc    *toc;
+	pg_background_fixed_data *fdata;
+	char	   *sql;
+	char	   *gucstate;
+	shm_mq	   *mq;
+	shm_mq_handle *responseq;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_background");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "pg_background session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	/* Connect to the dynamic shared memory segment. */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PG_BACKGROUND_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			   errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, PG_BACKGROUND_KEY_FIXED_DATA);
+	sql = shm_toc_lookup(toc, PG_BACKGROUND_KEY_SQL);
+	gucstate = shm_toc_lookup(toc, PG_BACKGROUND_KEY_GUC);
+	mq = shm_toc_lookup(toc, PG_BACKGROUND_KEY_QUEUE);
+	shm_mq_set_sender(mq, MyProc);
+	responseq = shm_mq_attach(mq, seg, NULL);
+
+	/* Redirect protocol messages to responseq. */
+	pq_redirect_to_shm_mq(mq, responseq);
+
+	/*
+	 * Initialize our user and database ID based on the strings version of
+	 * the data, and then go back and check that we actually got the database
+	 * and user ID that we intended to get.  We do this because it's not
+	 * impossible for the process that started us to die before we get here,
+	 * and the user or database could be renamed in the meantime.  We don't
+	 * want to latch on the wrong object by accident.  There should probably
+	 * be a variant of BackgroundWorkerInitializeConnection that accepts OIDs
+	 * rather than strings.
+	 */
+	BackgroundWorkerInitializeConnection(NameStr(fdata->database),
+										 NameStr(fdata->authenticated_user));
+	if (fdata->database_id != MyDatabaseId ||
+		fdata->authenticated_user_id != GetAuthenticatedUserId())
+		ereport(ERROR,
+			(errmsg("user or database renamed during pg_background startup")));
+
+	/* Restore GUC values from launching backend. */
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	/* Restore user ID and security context. */
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	/* Prepare to execute the query. */
+	SetCurrentStatementStartTimestamp();
+	debug_query_string = sql;
+	pgstat_report_activity(STATE_RUNNING, sql);
+
+	/* Execute the query. */
+	execute_sql_string(sql);
+
+	/* Post-execution cleanup. */
+	ProcessCompletedNotifies();
+	pgstat_report_activity(STATE_IDLE, sql);
+	pgstat_report_stat(true);
+
+	/* Signal that we are done. */
+	ReadyForQuery(DestRemote);
+}
+
+/*
+ * Execute given SQL string.
+ *
+ * Using SPI here would preclude backgrounding commands like VACUUM which one
+ * might very well wish to launch in the background.  So we do this instead.
+ */
+static void
+execute_sql_string(const char *sql)
+{
+	List	   *raw_parsetree_list;
+	ListCell   *lc1;
+	bool		isTopLevel;
+	int			commands_remaining;
+	MemoryContext	parsecontext;
+	MemoryContext	oldcontext;
+
+	/* Start up a transaction command. */
+	start_xact_command();
+
+	/*
+	 * Parse the SQL string into a list of raw parse trees.
+	 *
+	 * Because we allow statements that perform internal transaction control,
+	 * we can't do this in TopTransactionContext; the parse trees might get
+	 * blown away before we're done executing them.
+	 */
+	parsecontext = AllocSetContextCreate(TopMemoryContext,
+										 "pg_background parse/plan",
+										 ALLOCSET_DEFAULT_MINSIZE,
+										 ALLOCSET_DEFAULT_INITSIZE,
+										 ALLOCSET_DEFAULT_MAXSIZE);
+	oldcontext = MemoryContextSwitchTo(parsecontext);
+	raw_parsetree_list = pg_parse_query(sql);
+	commands_remaining = list_length(raw_parsetree_list);
+	isTopLevel = commands_remaining == 1;
+	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * Do parse analysis, rule rewrite, planning, and execution for each raw
+	 * parsetree.  We must fully execute each query before beginning parse
+	 * analysis on the next one, since there may be interdependencies.
+	 */
+	foreach(lc1, raw_parsetree_list)
+	{
+		Node	   *parsetree = (Node *) lfirst(lc1);
+		const char *commandTag;
+		char        completionTag[COMPLETION_TAG_BUFSIZE];
+		List       *querytree_list,
+				   *plantree_list;
+		bool		snapshot_set = false;
+		Portal		portal;
+		DestReceiver *receiver;
+		int16		format = 1;
+
+		/*
+		 * We don't allow transaction-control commands like COMMIT and ABORT
+		 * here.  The entire SQL statement is executed as a single transaction
+		 * which commits if no errors are encountered.
+		 */
+		if (IsA(parsetree, TransactionStmt))
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("transaction control statements are not allowed in pg_background")));
+
+		/*
+		 * Get the command name for use in status display (it also becomes the
+		 * default completion tag, down inside PortalRun).  Set ps_status and
+		 * do any special start-of-SQL-command processing needed by the
+		 * destination.
+		 */
+		commandTag = CreateCommandTag(parsetree);
+		set_ps_display(commandTag, false);
+		BeginCommand(commandTag, DestNone);
+
+		/* Set up a snapshot if parse analysis/planning will need one. */
+		if (analyze_requires_snapshot(parsetree))
+		{
+			PushActiveSnapshot(GetTransactionSnapshot());
+			snapshot_set = true;
+		}
+
+		/*
+		 * OK to analyze, rewrite, and plan this query.
+		 *
+		 * As with parsing, we need to make sure this data outlives the
+		 * transaction, because of the possibility that the statement might
+		 * perform internal transaction control.
+		 */
+		oldcontext = MemoryContextSwitchTo(parsecontext);
+		querytree_list = pg_analyze_and_rewrite(parsetree, sql, NULL, 0);
+		plantree_list = pg_plan_queries(querytree_list, 0, NULL);
+
+		/* Done with the snapshot used for parsing/planning */
+		if (snapshot_set)
+			PopActiveSnapshot();
+
+		/* If we got a cancel signal in analysis or planning, quit */
+		CHECK_FOR_INTERRUPTS();
+
+		/*
+		 * Execute the query using the unnamed portal.
+		 */
+		portal = CreatePortal("", true, true);
+		/* Don't display the portal in pg_cursors */
+		portal->visible = false;
+		PortalDefineQuery(portal, NULL, sql, commandTag, plantree_list, NULL);
+		PortalStart(portal, NULL, 0, InvalidSnapshot);
+
+		/* We always use binary format, for efficiency. */
+		PortalSetResultFormat(portal, 1, &format);
+
+		/*
+		 * Tuples returned by any command other than the last are simply
+		 * discarded; but those returned by the last (or only) command are
+		 * redirected to the shared memory queue we're using for communication
+		 * with the launching backend. If the launching backend is gone or has
+		 * detached us, these messages will just get dropped on the floor.
+		 */
+		--commands_remaining;
+		if (commands_remaining > 0)
+			receiver = CreateDestReceiver(DestNone);
+		else
+		{
+			receiver = CreateDestReceiver(DestRemote);
+			SetRemoteDestReceiverParams(receiver, portal);
+		}
+
+		/*
+		 * Only once the portal and destreceiver have been established can
+		 * we return to the transaction context.  All that stuff needs to
+		 * survive an internal commit inside PortalRun!
+		 */
+		MemoryContextSwitchTo(oldcontext);
+
+		/* Here's where we actually execute the command. */
+		(void) PortalRun(portal, FETCH_ALL, isTopLevel, receiver, receiver,
+						 completionTag);
+
+		/* Clean up the receiver. */
+		(*receiver->rDestroy) (receiver);
+
+		/* Clean up the portal. */
+		PortalDrop(portal, false);
+
+		/*
+		 * If this is the last parsetree, close down transaction statement
+		 * before reporting CommandComplete.  Otherwise, we need a
+		 * CommandCounterIncrement.
+		 */
+		if (lnext(lc1) == NULL)
+			finish_xact_command();
+		else
+			CommandCounterIncrement();
+
+		/*
+		 * Send a CommandComplete message even if we suppressed the query
+		 * results.  The user backend will report the command tags in the
+		 * absence of any true query results.
+		 */
+		EndCommand(completionTag, DestRemote);
+	}
+
+	/* Make sure there's not still a transaction open. */
+	finish_xact_command();
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int			save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
diff --git a/contrib/pg_background/pg_background.control b/contrib/pg_background/pg_background.control
new file mode 100644
index 0000000..733d0e1
--- /dev/null
+++ b/contrib/pg_background/pg_background.control
@@ -0,0 +1,4 @@
+comment = 'Run SQL queries in the background'
+default_version = '1.0'
+module_pathname = '$libdir/pg_background'
+relocatable = true
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 61f17bf..e44f5fa 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -190,8 +190,6 @@ static int	errdetail_execute(List *raw_parsetree_list);
 static int	errdetail_params(ParamListInfo params);
 static int	errdetail_abort(void);
 static int	errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
 static bool IsTransactionExitStmt(Node *parsetree);
 static bool IsTransactionExitStmtList(List *parseTrees);
 static bool IsTransactionStmtList(List *parseTrees);
@@ -2373,7 +2371,7 @@ exec_describe_portal_message(const char *portal_name)
 /*
  * Convenience routines for starting/committing a single command.
  */
-static void
+void
 start_xact_command(void)
 {
 	if (!xact_started)
@@ -2393,7 +2391,7 @@ start_xact_command(void)
 	}
 }
 
-static void
+void
 finish_xact_command(void)
 {
 	if (xact_started)
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c23f4da..8e84ee7 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -224,7 +224,7 @@ extern PGPROC *PreparedXactProcs;
 
 /* configurable options */
 extern int	DeadlockTimeout;
-extern int	StatementTimeout;
+extern PGDLLIMPORT int	StatementTimeout;
 extern int	LockTimeout;
 extern bool log_lock_waits;
 
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 60f7532..fd3df58 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -69,6 +69,8 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
 																 * handler */
 extern void prepare_for_client_read(void);
 extern void client_read_ended(void);
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
 extern void process_postgres_switches(int argc, char *argv[],
 						  GucContext ctx, const char **dbname);
 extern void PostgresMain(int argc, char *argv[],
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to