On Wed, Sep 10, 2014 at 4:01 PM, Robert Haas <robertmh...@gmail.com> wrote:
>> Yes, although my issue with the hooks was not that you only provided them
>> for 2 functions, but the fact that it had no structure and the
>> implementation was "if hook set do this, else do that" which I don't see
>> like a good way of doing it.
>
> We've done it that way in a bunch of other places, like ExecutorRun().
> An advantage of this approach (I think) is that jumping to a fixed
> address is faster than jumping through a function pointer - so with
> the approach I've taken here, the common case where we're talking to
> the client incurs only the overhead of a null-test, and the larger
> overhead of the function pointer jump is incurred only when the hook
> is in use.  Maybe that's not enough of a difference to matter to
> anything, but I think the contention that I've invented some novel
> kind of interface here doesn't hold up to scrutiny.  We have lots of
> hooks that work just like what I did here.

Here's what the other approach looks like.  I can't really see doing
this way and then only providing hooks for those two functions, so
this is with hooks for all the send-side stuff.

Original version: 9 files changed, 295 insertions(+), 3 deletions(-)
This version: 9 files changed, 428 insertions(+), 47 deletions(-)

There is admittedly a certain elegance to providing a complete set of
hooks, so maybe this is the way to go.  The remaining patches in the
patch series work with either the old version or this one; the changes
here don't affect anything else.

Anyone else have an opinion on which way is better here?

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
commit 76b9b171f81b1639e9d0379f5066212107a740f6
Author: Robert Haas <rh...@postgresql.org>
Date:   Wed May 28 19:30:21 2014 -0400

    Support frontend-backend protocol communication using a shm_mq.
    
    A background worker can use pq_redirect_to_shm_mq() to direct protocol
    that would normally be sent to the frontend to a shm_mq so that another
    process may read them.
    
    The receiving process may use pq_parse_errornotice() to parse an
    ErrorResponse or NoticeResponse from the background worker and, if
    it wishes, ThrowErrorData() to propagate the error (with or without
    further modification).
    
    V2: Lots more hooks, and a struct!

diff --git a/src/backend/libpq/Makefile b/src/backend/libpq/Makefile
index 8be0572..09410c4 100644
--- a/src/backend/libpq/Makefile
+++ b/src/backend/libpq/Makefile
@@ -15,7 +15,7 @@ include $(top_builddir)/src/Makefile.global
 # be-fsstubs is here for historical reasons, probably belongs elsewhere
 
 OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o pqcomm.o \
-       pqformat.o pqsignal.o
+       pqformat.o pqmq.o pqsignal.o
 
 ifeq ($(with_openssl),yes)
 OBJS += be-secure-openssl.o
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 605d891..7c4252d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -102,7 +102,6 @@
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
-
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
 
@@ -134,16 +133,38 @@ static bool DoingCopyOut;
 
 
 /* Internal functions */
-static void pq_close(int code, Datum arg);
+static void socket_comm_reset(void);
+static void socket_close(int code, Datum arg);
+static void socket_set_nonblocking(bool nonblocking);
+static int	socket_flush(void);
+static int	socket_flush_if_writable(void);
+static bool socket_is_send_pending(void);
+static int	socket_putmessage(char msgtype, const char *s, size_t len);
+static void socket_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void socket_startcopyout(void);
+static void socket_endcopyout(bool errorAbort);
 static int	internal_putbytes(const char *s, size_t len);
 static int	internal_flush(void);
-static void pq_set_nonblocking(bool nonblocking);
+static void socket_set_nonblocking(bool nonblocking);
 
 #ifdef HAVE_UNIX_SOCKETS
 static int	Lock_AF_UNIX(char *unixSocketDir, char *unixSocketPath);
 static int	Setup_AF_UNIX(char *sock_path);
 #endif   /* HAVE_UNIX_SOCKETS */
 
+PQsendMethods PQsendSocketMethods;
+
+static PQsendMethods PqSendSocketMethods = {
+	socket_comm_reset,
+	socket_flush,
+	socket_flush_if_writable,
+	socket_is_send_pending,
+	socket_putmessage,
+	socket_putmessage_noblock,
+	socket_startcopyout,
+	socket_endcopyout
+};
+
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -152,24 +173,25 @@ static int	Setup_AF_UNIX(char *sock_path);
 void
 pq_init(void)
 {
+	PqSendMethods = &PqSendSocketMethods;
 	PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
 	PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
 	PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
 	PqCommBusy = false;
 	DoingCopyOut = false;
-	on_proc_exit(pq_close, 0);
+	on_proc_exit(socket_close, 0);
 }
 
 /* --------------------------------
- *		pq_comm_reset - reset libpq during error recovery
+ *		socket_comm_reset - reset libpq during error recovery
  *
  * This is called from error recovery at the outer idle loop.  It's
  * just to get us out of trouble if we somehow manage to elog() from
  * inside a pqcomm.c routine (which ideally will never happen, but...)
  * --------------------------------
  */
-void
-pq_comm_reset(void)
+static void
+socket_comm_reset(void)
 {
 	/* Do not throw away pending data, but do reset the busy flag */
 	PqCommBusy = false;
@@ -178,14 +200,14 @@ pq_comm_reset(void)
 }
 
 /* --------------------------------
- *		pq_close - shutdown libpq at backend exit
+ *		socket_close - shutdown libpq at backend exit
  *
  * Note: in a standalone backend MyProcPort will be null,
  * don't crash during exit...
  * --------------------------------
  */
 static void
-pq_close(int code, Datum arg)
+socket_close(int code, Datum arg)
 {
 	if (MyProcPort != NULL)
 	{
@@ -783,15 +805,20 @@ TouchSocketFiles(void)
  */
 
 /* --------------------------------
- *			  pq_set_nonblocking - set socket blocking/non-blocking
+ *			  socket_set_nonblocking - set socket blocking/non-blocking
  *
  * Sets the socket non-blocking if nonblocking is TRUE, or sets it
  * blocking otherwise.
  * --------------------------------
  */
 static void
-pq_set_nonblocking(bool nonblocking)
+socket_set_nonblocking(bool nonblocking)
 {
+	if (MyProcPort == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("there is no client connection")));
+
 	if (MyProcPort->noblock == nonblocking)
 		return;
 
@@ -844,7 +871,7 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	pq_set_nonblocking(false);
+	socket_set_nonblocking(false);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
@@ -935,7 +962,7 @@ pq_getbyte_if_available(unsigned char *c)
 	}
 
 	/* Put the socket into non-blocking mode */
-	pq_set_nonblocking(true);
+	socket_set_nonblocking(true);
 
 	r = secure_read(MyProcPort, c, 1);
 	if (r < 0)
@@ -1194,7 +1221,7 @@ internal_putbytes(const char *s, size_t len)
 		/* If buffer is full, then flush it out */
 		if (PqSendPointer >= PqSendBufferSize)
 		{
-			pq_set_nonblocking(false);
+			socket_set_nonblocking(false);
 			if (internal_flush())
 				return EOF;
 		}
@@ -1210,13 +1237,13 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
- *		pq_flush		- flush pending output
+ *		socket_flush		- flush pending output
  *
  *		returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_flush(void)
+static int
+socket_flush(void)
 {
 	int			res;
 
@@ -1224,7 +1251,7 @@ pq_flush(void)
 	if (PqCommBusy)
 		return 0;
 	PqCommBusy = true;
-	pq_set_nonblocking(false);
+	socket_set_nonblocking(false);
 	res = internal_flush();
 	PqCommBusy = false;
 	return res;
@@ -1310,8 +1337,8 @@ internal_flush(void)
  * Returns 0 if OK, or EOF if trouble.
  * --------------------------------
  */
-int
-pq_flush_if_writable(void)
+static int
+socket_flush_if_writable(void)
 {
 	int			res;
 
@@ -1324,7 +1351,7 @@ pq_flush_if_writable(void)
 		return 0;
 
 	/* Temporarily put the socket into non-blocking mode */
-	pq_set_nonblocking(true);
+	socket_set_nonblocking(true);
 
 	PqCommBusy = true;
 	res = internal_flush();
@@ -1333,11 +1360,11 @@ pq_flush_if_writable(void)
 }
 
 /* --------------------------------
- *		pq_is_send_pending	- is there any pending data in the output buffer?
+ *	socket_is_send_pending	- is there any pending data in the output buffer?
  * --------------------------------
  */
-bool
-pq_is_send_pending(void)
+static bool
+socket_is_send_pending(void)
 {
 	return (PqSendStart < PqSendPointer);
 }
@@ -1351,7 +1378,7 @@ pq_is_send_pending(void)
 
 
 /* --------------------------------
- *		pq_putmessage	- send a normal message (suppressed in COPY OUT mode)
+ *		socket_putmessage - send a normal message (suppressed in COPY OUT mode)
  *
  *		If msgtype is not '\0', it is a message type code to place before
  *		the message body.  If msgtype is '\0', then the message has no type
@@ -1375,8 +1402,8 @@ pq_is_send_pending(void)
  *		returns 0 if OK, EOF if trouble
  * --------------------------------
  */
-int
-pq_putmessage(char msgtype, const char *s, size_t len)
+static int
+socket_putmessage(char msgtype, const char *s, size_t len)
 {
 	if (DoingCopyOut || PqCommBusy)
 		return 0;
@@ -1408,8 +1435,8 @@ fail:
  *		If the output buffer is too small to hold the message, the buffer
  *		is enlarged.
  */
-void
-pq_putmessage_noblock(char msgtype, const char *s, size_t len)
+static void
+socket_putmessage_noblock(char msgtype, const char *s, size_t len)
 {
 	int res		PG_USED_FOR_ASSERTS_ONLY;
 	int			required;
@@ -1431,18 +1458,18 @@ pq_putmessage_noblock(char msgtype, const char *s, size_t len)
 
 
 /* --------------------------------
- *		pq_startcopyout - inform libpq that an old-style COPY OUT transfer
+ *		socket_startcopyout - inform libpq that an old-style COPY OUT transfer
  *			is beginning
  * --------------------------------
  */
-void
-pq_startcopyout(void)
+static void
+socket_startcopyout(void)
 {
 	DoingCopyOut = true;
 }
 
 /* --------------------------------
- *		pq_endcopyout	- end an old-style COPY OUT transfer
+ *		socket_endcopyout	- end an old-style COPY OUT transfer
  *
  *		If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  *		and must send a terminator line.  Since a partial data line might have
@@ -1451,8 +1478,8 @@ pq_startcopyout(void)
  *		not allow binary transfers, so a textual terminator is always correct.
  * --------------------------------
  */
-void
-pq_endcopyout(bool errorAbort)
+static void
+socket_endcopyout(bool errorAbort)
 {
 	if (!DoingCopyOut)
 		return;
@@ -1462,7 +1489,6 @@ pq_endcopyout(bool errorAbort)
 	DoingCopyOut = false;
 }
 
-
 /*
  * Support for TCP Keepalive parameters
  */
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
new file mode 100644
index 0000000..4a3c8b1
--- /dev/null
+++ b/src/backend/libpq/pqmq.c
@@ -0,0 +1,261 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.c
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *	src/backend/libpq/pqmq.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "tcop/tcopprot.h"
+#include "utils/builtins.h"
+
+static shm_mq *pq_mq;
+static shm_mq_handle *pq_mq_handle;
+static bool pq_mq_busy = false;
+
+static void mq_comm_reset(void);
+static int	mq_flush(void);
+static int	mq_flush_if_writable(void);
+static bool mq_is_send_pending(void);
+static int	mq_putmessage(char msgtype, const char *s, size_t len);
+static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
+static void mq_startcopyout(void);
+static void mq_endcopyout(bool errorAbort);
+
+static PQsendMethods PqSendMqMethods = {
+	mq_comm_reset,
+	mq_flush,
+	mq_flush_if_writable,
+	mq_is_send_pending,
+	mq_putmessage,
+	mq_putmessage_noblock,
+	mq_startcopyout,
+	mq_endcopyout
+};
+
+/*
+ * Arrange to redirect frontend/backend protocol messages to a shared-memory
+ * message queue.
+ */
+void
+pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
+{
+	PqSendMethods = &PqSendMqMethods;
+	pq_mq = mq;
+	pq_mq_handle = mqh;
+	whereToSendOutput = DestRemote;
+	FrontendProtocol = PG_PROTOCOL_LATEST;
+}
+
+static void
+mq_comm_reset(void)
+{
+	/* Nothing to do. */
+}
+
+static int
+mq_flush(void)
+{
+	/* Nothing to do. */
+	return 0;
+}
+
+static int
+mq_flush_if_writable(void)
+{
+	/* Nothing to do. */
+	return 0;
+}
+
+static bool
+mq_is_send_pending(void)
+{
+	/* There's never anything pending. */
+	return 0;
+}
+
+/*
+ * Transmit a libpq protocol message to the shared memory message queue
+ * selected via pq_mq_handle.  We don't include a length word, because the
+ * receiver will know the length of the message from shm_mq_receive().
+ */
+static int
+mq_putmessage(char msgtype, const char *s, size_t len)
+{
+	shm_mq_iovec	iov[2];
+	shm_mq_result	result;
+
+	/*
+	 * If we're sending a message, and we have to wait because the
+	 * queue is full, and then we get interrupted, and that interrupt
+	 * results in trying to send another message, we respond by detaching
+	 * the queue.  There's no way to return to the original context, but
+	 * even if there were, just queueing the message would amount to
+	 * indefinitely postponing the response to the interrupt.  So we do
+	 * this instead.
+	 */
+	if (pq_mq_busy)
+	{
+		if (pq_mq != NULL)
+			shm_mq_detach(pq_mq);
+		pq_mq = NULL;
+		return EOF;
+	}
+
+	pq_mq_busy = true;
+
+	iov[0].data = &msgtype;
+	iov[0].len = 1;
+	iov[1].data = s;
+	iov[1].len = len;
+
+	Assert(pq_mq_handle != NULL);
+	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	pq_mq_busy = false;
+
+	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
+	if (result != SHM_MQ_SUCCESS)
+		return EOF;
+	return 0;
+}
+
+static void
+mq_putmessage_noblock(char msgtype, const char *s, size_t len)
+{
+	/*
+	 * While the shm_mq machinery does support sending a message in
+	 * non-blocking mode, there's currently no way to try sending beginning
+	 * to send the message that doesn't also commit us to completing the
+	 * transmission.  This could be improved in the future, but for now
+	 * we don't need it.
+	 */
+	elog(ERROR, "not currently supported");
+}
+
+static void
+mq_startcopyout(void)
+{
+	/* Nothing to do. */
+}
+
+static void
+mq_endcopyout(bool errorAbort)
+{
+	/* Nothing to do. */
+}
+
+/*
+ * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
+ * structure with the results.
+ */
+void
+pq_parse_errornotice(StringInfo msg, ErrorData *edata)
+{
+	/* Initialize edata with reasonable defaults. */
+	MemSet(edata, 0, sizeof(ErrorData));
+	edata->elevel = ERROR;
+	edata->assoc_context = CurrentMemoryContext;
+
+	/* Loop over fields and extract each one. */
+	for (;;)
+	{
+		char	code = pq_getmsgbyte(msg);
+		const char *value;
+
+		if (code == '\0')
+		{
+			pq_getmsgend(msg);
+			break;
+		}
+	 	value = pq_getmsgstring(msg);
+
+		switch (code)
+		{
+			case PG_DIAG_SEVERITY:
+				if (strcmp(value, "DEBUG") == 0)
+					edata->elevel = DEBUG1;	/* or some other DEBUG level */
+				else if (strcmp(value, "LOG") == 0)
+					edata->elevel = LOG;	/* can't be COMMERROR */
+				else if (strcmp(value, "INFO") == 0)
+					edata->elevel = INFO;
+				else if (strcmp(value, "NOTICE") == 0)
+					edata->elevel = NOTICE;
+				else if (strcmp(value, "WARNING") == 0)
+					edata->elevel = WARNING;
+				else if (strcmp(value, "ERROR") == 0)
+					edata->elevel = ERROR;
+				else if (strcmp(value, "FATAL") == 0)
+					edata->elevel = FATAL;
+				else if (strcmp(value, "PANIC") == 0)
+					edata->elevel = PANIC;
+				else
+					elog(ERROR, "unknown error severity");
+				break;
+			case PG_DIAG_SQLSTATE:
+				if (strlen(value) != 5)
+					elog(ERROR, "malformed sql state");
+				edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
+												  value[3], value[4]);
+				break;
+			case PG_DIAG_MESSAGE_PRIMARY:
+				edata->message = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_DETAIL:
+				edata->detail = pstrdup(value);
+				break;
+			case PG_DIAG_MESSAGE_HINT:
+				edata->hint = pstrdup(value);
+				break;
+			case PG_DIAG_STATEMENT_POSITION:
+				edata->cursorpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_POSITION:
+				edata->internalpos = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_INTERNAL_QUERY:
+				edata->internalquery = pstrdup(value);
+				break;
+			case PG_DIAG_CONTEXT:
+				edata->context = pstrdup(value);
+				break;
+			case PG_DIAG_SCHEMA_NAME:
+				edata->schema_name = pstrdup(value);
+				break;
+			case PG_DIAG_TABLE_NAME:
+				edata->table_name = pstrdup(value);
+				break;
+			case PG_DIAG_COLUMN_NAME:
+				edata->column_name = pstrdup(value);
+				break;
+			case PG_DIAG_DATATYPE_NAME:
+				edata->datatype_name = pstrdup(value);
+				break;
+			case PG_DIAG_CONSTRAINT_NAME:
+				edata->constraint_name = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_FILE:
+				edata->filename = pstrdup(value);
+				break;
+			case PG_DIAG_SOURCE_LINE:
+				edata->lineno = pg_atoi(value, sizeof(int), '\0');
+				break;
+			case PG_DIAG_SOURCE_FUNCTION:
+				edata->funcname = pstrdup(value);
+				break;
+			default:
+				elog(ERROR, "unknown error field: %d", (int) code);
+				break;
+		}
+	}
+}
diff --git a/src/backend/utils/adt/numutils.c b/src/backend/utils/adt/numutils.c
index ca5a8a5..1d13363 100644
--- a/src/backend/utils/adt/numutils.c
+++ b/src/backend/utils/adt/numutils.c
@@ -34,7 +34,7 @@
  * overflow.
  */
 int32
-pg_atoi(char *s, int size, int c)
+pg_atoi(const char *s, int size, int c)
 {
 	long		l;
 	char	   *badp;
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 32a9663..2316464 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1577,6 +1577,57 @@ FlushErrorState(void)
 }
 
 /*
+ * ThrowErrorData --- report an error described by an ErrorData structure
+ *
+ * This is intended to be used to re-report errors originally thrown by
+ * background worker processes and then propagated (with or without
+ * modification) to the backend responsible for them.
+ */
+void
+ThrowErrorData(ErrorData *edata)
+{
+	ErrorData *newedata;
+	MemoryContext	oldcontext;
+
+	if (!errstart(edata->elevel, edata->filename, edata->lineno,
+				  edata->funcname, NULL))
+		return;
+
+	newedata = &errordata[errordata_stack_depth];
+	oldcontext = MemoryContextSwitchTo(edata->assoc_context);
+
+	/* Copy the supplied fields to the error stack. */
+	if (edata->sqlerrcode > 0)
+		newedata->sqlerrcode = edata->sqlerrcode;
+	if (edata->message)
+		newedata->message = pstrdup(edata->message);
+	if (edata->detail)
+		newedata->detail = pstrdup(edata->detail);
+	if (edata->detail_log)
+		newedata->detail_log = pstrdup(edata->detail_log);
+	if (edata->hint)
+		newedata->hint = pstrdup(edata->hint);
+	if (edata->context)
+		newedata->context = pstrdup(edata->context);
+	if (edata->schema_name)
+		newedata->schema_name = pstrdup(edata->schema_name);
+	if (edata->table_name)
+		newedata->table_name = pstrdup(edata->table_name);
+	if (edata->column_name)
+		newedata->column_name = pstrdup(edata->column_name);
+	if (edata->datatype_name)
+		newedata->datatype_name = pstrdup(edata->datatype_name);
+	if (edata->constraint_name)
+		newedata->constraint_name = pstrdup(edata->constraint_name);
+	if (edata->internalquery)
+		newedata->internalquery = pstrdup(edata->internalquery);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	errfinish(0);
+}
+
+/*
  * ReThrowError --- re-throw a previously copied error
  *
  * A handler can do CopyErrorData/FlushErrorState to get out of the error
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 5da9d8d..0b8db42 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -37,6 +37,31 @@ typedef struct
 	}			u;
 } PQArgBlock;
 
+typedef struct
+{
+	void (*comm_reset)(void);
+	int	(*flush)(void);
+	int	(*flush_if_writable)(void);
+	bool (*is_send_pending)(void);
+	int	(*putmessage)(char msgtype, const char *s, size_t len);
+	void (*putmessage_noblock)(char msgtype, const char *s, size_t len);
+	void (*startcopyout)(void);
+	void (*endcopyout)(bool errorAbort);
+} PQsendMethods;
+
+PQsendMethods *PqSendMethods;
+
+#define pq_comm_reset()	(PqSendMethods->comm_reset())
+#define pq_flush() (PqSendMethods->flush())
+#define pq_flush_if_writable() (PqSendMethods->flush_if_writable())
+#define pq_is_send_pending() (PqSendMethods->is_send_pending())
+#define pq_putmessage(msgtype, s, len) \
+	(PqSendMethods->putmessage(msgtype, s, len))
+#define pq_putmessage_noblock(msgtype, s, len) \
+	(PqSendMethods->putmessage(msgtype, s, len))
+#define pq_startcopyout() (PqSendMethods->startcopyout())
+#define pq_endcopyout(errorAbort) (PqSendMethods->endcopyout(errorAbort))
+
 /*
  * External functions.
  */
@@ -51,7 +76,6 @@ extern int	StreamConnection(pgsocket server_fd, Port *port);
 extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void pq_init(void);
-extern void pq_comm_reset(void);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern int	pq_getmessage(StringInfo s, int maxlen);
@@ -59,13 +83,6 @@ extern int	pq_getbyte(void);
 extern int	pq_peekbyte(void);
 extern int	pq_getbyte_if_available(unsigned char *c);
 extern int	pq_putbytes(const char *s, size_t len);
-extern int	pq_flush(void);
-extern int	pq_flush_if_writable(void);
-extern bool pq_is_send_pending(void);
-extern int	pq_putmessage(char msgtype, const char *s, size_t len);
-extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
-extern void pq_startcopyout(void);
-extern void pq_endcopyout(bool errorAbort);
 
 /*
  * prototypes for functions in be-secure.c
@@ -75,6 +92,9 @@ extern char *ssl_key_file;
 extern char *ssl_ca_file;
 extern char *ssl_crl_file;
 
+extern int	(*pq_putmessage_hook)(char msgtype, const char *s, size_t len);
+extern int  (*pq_flush_hook)(void);
+
 extern int	secure_initialize(void);
 extern bool secure_loaded_verify_locations(void);
 extern void secure_destroy(void);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
new file mode 100644
index 0000000..6bb24d9
--- /dev/null
+++ b/src/include/libpq/pqmq.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * pqmq.h
+ *	  Use the frontend/backend protocol for communication over a shm_mq
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/libpq/pqmq.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PQMQ_H
+#define PQMQ_H
+
+#include "storage/shm_mq.h"
+
+extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+
+extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
+
+#endif   /* PQMQ_H */
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 78cc0a0..e1bea54 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -285,7 +285,7 @@ extern Datum current_schema(PG_FUNCTION_ARGS);
 extern Datum current_schemas(PG_FUNCTION_ARGS);
 
 /* numutils.c */
-extern int32 pg_atoi(char *s, int size, int c);
+extern int32 pg_atoi(const char *s, int size, int c);
 extern void pg_itoa(int16 i, char *a);
 extern void pg_ltoa(int32 l, char *a);
 extern void pg_lltoa(int64 ll, char *a);
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 92073be..87438b8 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -415,6 +415,7 @@ extern ErrorData *CopyErrorData(void);
 extern void FreeErrorData(ErrorData *edata);
 extern void FlushErrorState(void);
 extern void ReThrowError(ErrorData *edata) __attribute__((noreturn));
+extern void ThrowErrorData(ErrorData *edata);
 extern void pg_re_throw(void) __attribute__((noreturn));
 
 extern char *GetErrorContextStack(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to