> For additional entertainment, I include patches that integrate
> background sessions into dblink.  So dblink can open a connection to a
> background session, and then you can use the existing dblink functions
> to send queries, read results, etc.  People use dblink to make
> self-connections to get autonomous subsessions, so this would directly
> address that use case.  The 0001 patch is some prerequisite refactoring
> to remove an ugly macro mess, which is useful independent of this.  0002
> is the actual patch.

Updated patch, mainly with improved error handling and some tidying up.

Related to this is also the patch in
<https://www.postgresql.org/message-id/d100f62a-0606-accc-693b-cdc6d16b9...@2ndquadrant.com>
as a resource control mechanism.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 48058db2817b8955ea3424bc77b55bb5782ffb0d Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Wed, 28 Dec 2016 12:00:00 -0500
Subject: [PATCH v3] Add background sessions

This adds a C API to run SQL statements in a background worker,
communicating by FE/BE protocol over a DSM message queue.  This can be
used to execute statements and transactions separate from the main
foreground session.

Also included is a PL/Python interface to this functionality.
---
 doc/src/sgml/bgsession.sgml                     | 273 +++++++
 doc/src/sgml/filelist.sgml                      |   1 +
 doc/src/sgml/plpython.sgml                      | 105 ++-
 doc/src/sgml/postgres.sgml                      |   1 +
 doc/src/sgml/stylesheet-common.xsl              |   1 +
 src/backend/commands/variable.c                 |   5 +
 src/backend/libpq/pqmq.c                        |  26 +-
 src/backend/storage/ipc/shm_mq.c                |  19 +
 src/backend/tcop/Makefile                       |   2 +-
 src/backend/tcop/bgsession.c                    | 929 ++++++++++++++++++++++++
 src/backend/tcop/postgres.c                     |  38 +-
 src/include/commands/variable.h                 |   1 +
 src/include/libpq/pqmq.h                        |   1 +
 src/include/storage/shm_mq.h                    |   3 +
 src/include/tcop/bgsession.h                    |  32 +
 src/include/tcop/tcopprot.h                     |  10 +
 src/pl/plpython/Makefile                        |   2 +
 src/pl/plpython/expected/plpython_bgsession.out | 217 ++++++
 src/pl/plpython/expected/plpython_test.out      |   7 +-
 src/pl/plpython/plpy_bgsession.c                | 509 +++++++++++++
 src/pl/plpython/plpy_bgsession.h                |  18 +
 src/pl/plpython/plpy_main.h                     |   3 +
 src/pl/plpython/plpy_planobject.c               |   1 +
 src/pl/plpython/plpy_planobject.h               |   2 +
 src/pl/plpython/plpy_plpymodule.c               |   5 +
 src/pl/plpython/plpy_spi.c                      |  29 +-
 src/pl/plpython/plpy_spi.h                      |   3 +
 src/pl/plpython/sql/plpython_bgsession.sql      | 177 +++++
 28 files changed, 2388 insertions(+), 32 deletions(-)
 create mode 100644 doc/src/sgml/bgsession.sgml
 create mode 100644 src/backend/tcop/bgsession.c
 create mode 100644 src/include/tcop/bgsession.h
 create mode 100644 src/pl/plpython/expected/plpython_bgsession.out
 create mode 100644 src/pl/plpython/plpy_bgsession.c
 create mode 100644 src/pl/plpython/plpy_bgsession.h
 create mode 100644 src/pl/plpython/sql/plpython_bgsession.sql

diff --git a/doc/src/sgml/bgsession.sgml b/doc/src/sgml/bgsession.sgml
new file mode 100644
index 0000000000..14efb1b495
--- /dev/null
+++ b/doc/src/sgml/bgsession.sgml
@@ -0,0 +1,273 @@
+<!-- doc/src/sgml/bgsession.sgml -->
+
+<chapter id="bgsession">
+ <title>Background Session API</title>
+
+ <para>
+  The background session API is a C API for creating additional database
+  sessions in the background and running SQL statements in them.  A background
+  session behaves like a normal (foreground) session in that it has session
+  state, transactions, can run SQL statements, and so on.  Unlike a foreground
+  session, it is not connected directly to a client.  Instead the foreground
+  session can use this API to execute SQL statements and retrieve their
+  results.  Higher-level integrations, such as in procedural languages, can
+  make this functionality available to clients.  Background sessions are
+  independent from their foreground sessions in their session and transaction
+  state.  So a background session cannot see uncommitted data in foreground
+  sessions or vice versa, and there is no preferential treatment about
+  locking.  Like all sessions, background sessions are separate processes.
+  Foreground and background sessions communicate over shared memory messages
+  queues instead of the sockets that a client/server connection uses.
+ </para>
+
+ <para>
+  Background sessions can be useful in a variety of scenarios when effects
+  that are independent of the foreground session are to be achieved, for
+  example:
+  <itemizedlist>
+   <listitem>
+    <para>
+     Commit data independent of whether a foreground transaction commits, for
+     example for auditing.  A trigger in the foreground session could effect
+     the necessary writes via a background session.
+    </para>
+   </listitem>
+   <listitem>
+    <para>
+     Large changes can be split up into smaller transactions.  A foreground
+     session can orchestrate the logic, for example in a function, while the
+     actual writes and commits are executed in a background session.
+    </para>
+   </listitem>
+  </itemizedlist>
+ </para>
+
+ <sect1 id="bgsession-types">
+  <title>API</title>
+
+  <para>
+   Use <literal>#include "tcop/bgsession.h"</literal> to include the API
+   declarations.
+  </para>
+
+  <sect2>
+   <title>Types</title>
+
+   <variablelist>
+    <varlistentry>
+     <term><type>BackgroundSession</type></term>
+     <listitem>
+      <para>
+       An opaque connection handle.  Multiple background sessions can exist at
+       the same time, and each is prepresented by an instance of this type.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionPreparedStatement</type></term>
+     <listitem>
+      <para>
+       An opaque handle for a prepared statement, created when the statement
+       is prepared and used when the statement is executed.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionResult</type></term>
+     <listitem>
+      <para>
+       A handle for a query result, defined as:
+<programlisting>
+typedef struct BackgroundSessionResult
+{
+    TupleDesc   tupdesc;
+    HeapTuple  *tuples;
+    uint64      ntuples;
+    const char *command;
+} BackgroundSessionResult;
+</programlisting>
+       <structfield>tupdesc</structfield> describes the result
+       rows, <symbol>NULL</symbol> if the command does not return
+       rows.  <structfield>tuples</structfield> is an array of
+       size <structfield>ntuples</structfield> with the result
+       rows.  <structfield>command</structfield> is the tag of the executed
+       command.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2>
+   <title>Functions</title>
+
+   <variablelist>
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSession *<function>BackgroundSessionStart</function></funcdef>
+        <paramdef>void</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Creates a background session.  This starts the background worker and
+       establishes a connection to it.
+      </para>
+
+      <para>
+       A background session does not automatically end when the foreground
+       transaction or session ends.
+       Use <function>BackgroundSessionEnd()</function> to end a background
+       session.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>void <function>BackgroundSessionEnd</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Ends a background session.  This closes the connection the background
+       worker.
+      </para>
+
+      <para>
+       It is an error to close a background session with a transaction block
+       open.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecute</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute an SQL statement and return the result.  Access the fields of
+       the result structure to get details about the command result.  If there
+       is an error, this function does not return.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>void <function>BackgroundSessionSend</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute an SQL statement, but don't wait for the result.  The result
+       can then be fetched later
+       with <function>BackgroundSessionGetResult()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionSend</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Obtain the result of an SQL statement previously sent using
+       with <function>BackgroundSessionSend()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionPreparedStatement *<function>BackgroundSessionPrepare</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Oid <parameter>argtypes</parameter>[]</paramdef>
+        <paramdef>const char *<parameter>argnames</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Prepare an SQL statement for later execution
+       by <function>BackgroundSessionExecutePrepared()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecutePrepared</function></funcdef>
+        <paramdef>BackgroundSessionPreparedStatement *<parameter>stmt</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Datum <parameter>values</parameter>[]</paramdef>
+        <paramdef>bool <parameter>nulls</parameter>[]</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute a statement previously prepared by
+       <function>BackgroundSessionPrepare()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+
+   <para>
+    Here is a very simple example:
+
+<programlisting>
+#include "tcop/bgsession.h"
+
+void
+myfunc()
+{
+    BackgroundSession *session;
+    BackgroundSessionResult *result;
+
+    session = BackgroundSessionStart();
+
+    result = BackgroundSessionExecute(session, "SELECT ...");
+    elog(INFO, "returned %d rows", list_length(result->tuples));
+
+    BackgroundSessionEnd(session);
+}
+</programlisting>
+   </para>
+  </sect2>
+ </sect1>
+</chapter>
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index e7aa92f914..cdf628e913 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -53,6 +53,7 @@
 <!ENTITY logical-replication    SYSTEM "logical-replication.sgml">
 
 <!-- programmer's guide -->
+<!ENTITY bgsession  SYSTEM "bgsession.sgml">
 <!ENTITY bgworker   SYSTEM "bgworker.sgml">
 <!ENTITY dfunc      SYSTEM "dfunc.sgml">
 <!ENTITY ecpg       SYSTEM "ecpg.sgml">
diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index 46397781be..b36f246c0c 100644
--- a/doc/src/sgml/plpython.sgml
+++ b/doc/src/sgml/plpython.sgml
@@ -966,7 +966,8 @@ <title>Database Access Functions</title>
         <term><literal><function>status</function>()</literal></term>
         <listitem>
          <para>
-          The <function>SPI_execute()</function> return value.
+          The <function>SPI_execute()</function> return value, or 0 if not
+          applicable.
          </para>
         </listitem>
        </varlistentry>
@@ -1357,6 +1358,108 @@ <title>Older Python Versions</title>
   </sect2>
  </sect1>
 
+ <sect1 id="plpython-bgsession">
+  <title>Background Sessions</title>
+
+  <para>
+   PL/Python exposes the background session interface described
+   in <xref linkend="bgsession"> as a Python API.  A background session is
+   represented by a Python object of type <type>plpy.BackgroundSession</type>.
+   It has these methods:
+
+   <variablelist>
+    <varlistentry>
+     <term><literal>.close()</literal></term>
+     <listitem>
+      <para>
+       Close the session.  This must be called or the session will leak.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute(<parameter>sql</parameter>)</literal></term>
+     <listitem>
+      <para>
+       Execute an SQL statement.  The return value is a result object of the
+       same type as returned by <literal>plpy.execute</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.prepare(<parameter>sql</parameter>, <replaceable>types</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Prepare an SQL statement.  The parameter data types are specified as
+       for <literal>plpy.prepare</literal>.  The return value is a plan
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute_prepared(<parameter>plan</parameter>, <replaceable>values</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Execute a prepared statement.  The parameter values are specified as
+       for <literal>plpy.execute</literal>.  The return value is a result
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__enter__()</literal></term>
+     <listitem>
+      <para>
+       This just returns self.  It is supplied so that
+       a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  See example below.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__exit__(<parameter>exc_type</parameter>, <parameter>exc</parameter>, <parameter>tb</parameter>)</literal></term>
+     <listitem>
+      <para>
+       This is the same as <literal>.close()</literal>.  It is supplied so
+       that a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  (The arguments are ignored.)
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
+
+  <para>
+   A <classname>BackgroundSession</classname> object can be preserved across
+   function calls or passed between functions via the <varname>SD</varname>
+   and <varname>GD</varname> variables.
+  </para>
+
+  <para>
+   Example:
+<programlisting>
+CREATE FUNCTION bulkload() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(1, 1001):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 100 == 0:
+            a.execute("COMMIT")
+
+return 0
+$$;
+</programlisting>
+   This function inserts 1000 values into the table and commits after every
+   100 values.
+  </para>
+ </sect1>
+
  <sect1 id="plpython-util">
   <title>Utility Functions</title>
   <para>
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 4e169d1b18..50f9c3964f 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -221,6 +221,7 @@ <title>Server Programming</title>
 
   &spi;
   &bgworker;
+  &bgsession;
   &logicaldecoding;
   &replication-origins;
 
diff --git a/doc/src/sgml/stylesheet-common.xsl b/doc/src/sgml/stylesheet-common.xsl
index c23d38f128..e620f87ef0 100644
--- a/doc/src/sgml/stylesheet-common.xsl
+++ b/doc/src/sgml/stylesheet-common.xsl
@@ -28,6 +28,7 @@
 </xsl:param>
 
 <xsl:param name="callout.graphics" select="'0'"></xsl:param>
+<xsl:param name="funcsynopsis.style">ansi</xsl:param>
 <xsl:param name="toc.section.depth">2</xsl:param>
 <xsl:param name="linenumbering.extension" select="'0'"></xsl:param>
 <xsl:param name="section.autolabel" select="1"></xsl:param>
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index d75bddd87b..8be22a1b0f 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -671,12 +671,17 @@ show_random_seed(void)
  * SET CLIENT_ENCODING
  */
 
+void (*check_client_encoding_hook)(void);
+
 bool
 check_client_encoding(char **newval, void **extra, GucSource source)
 {
 	int			encoding;
 	const char *canonical_name;
 
+	if (check_client_encoding_hook)
+		check_client_encoding_hook();
+
 	/* Look up the encoding by name */
 	encoding = pg_valid_client_encoding(*newval);
 	if (encoding < 0)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 96939327c3..412d894b2b 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -48,6 +48,11 @@ static PQcommMethods PqCommMqMethods = {
 	mq_endcopyout
 };
 
+static PQcommMethods *save_PqCommMethods;
+static CommandDest save_whereToSendOutput;
+static ProtocolVersion save_FrontendProtocol;
+static dsm_segment *save_seg;
+
 /*
  * Arrange to redirect frontend/backend protocol messages to a shared-memory
  * message queue.
@@ -55,12 +60,30 @@ static PQcommMethods PqCommMqMethods = {
 void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
+	save_PqCommMethods = PqCommMethods;
+	save_whereToSendOutput = whereToSendOutput;
+	save_FrontendProtocol = FrontendProtocol;
+
 	PqCommMethods = &PqCommMqMethods;
 	pq_mq = shm_mq_get_queue(mqh);
 	pq_mq_handle = mqh;
 	whereToSendOutput = DestRemote;
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+
+	save_seg = seg;
+}
+
+void
+pq_stop_redirect_to_shm_mq(void)
+{
+	cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+	PqCommMethods = save_PqCommMethods;
+	whereToSendOutput = save_whereToSendOutput;
+	FrontendProtocol = save_FrontendProtocol;
+	pq_mq = NULL;
+	pq_mq_handle = NULL;
+	save_seg = NULL;
 }
 
 /*
@@ -182,7 +205,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 
 	Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
 	if (result != SHM_MQ_SUCCESS)
-		return EOF;
+		ereport(COMMERROR,
+				(errmsg("could not send on message queue: %s", shm_mq_strerror(result))));
 	return 0;
 }
 
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index f5bf807cd6..99ad9854df 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -805,6 +805,25 @@ shm_mq_get_queue(shm_mq_handle *mqh)
 }
 
 /*
+ * Get error message string.
+ */
+const char *
+shm_mq_strerror(shm_mq_result res)
+{
+	switch (res)
+	{
+		case SHM_MQ_SUCCESS:
+			return gettext_noop("Success");
+		case SHM_MQ_WOULD_BLOCK:
+			return gettext_noop("Operation would block");
+		case SHM_MQ_DETACHED:
+			return gettext_noop("Other process has detached queue");
+		default:
+			return gettext_noop("Unknown error");
+	}
+}
+
+/*
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile
index 674302feb7..ab267673a3 100644
--- a/src/backend/tcop/Makefile
+++ b/src/backend/tcop/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/tcop
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS= dest.o fastpath.o postgres.o pquery.o utility.o
+OBJS= bgsession.o dest.o fastpath.o postgres.o pquery.o utility.o
 
 ifneq (,$(filter $(PORTNAME),cygwin win32))
 override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
new file mode 100644
index 0000000000..7578e93e9c
--- /dev/null
+++ b/src/backend/tcop/bgsession.c
@@ -0,0 +1,929 @@
+/*--------------------------------------------------------------------------
+ *
+ * bgsession.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/tcop/bgsession.c
+ *
+ *
+ * This implements a C API to open a background session and run SQL queries
+ * in it.  The session looks much like a normal database connection, but it is
+ * always to the same database, and there is no authentication needed.  The
+ * "backend" for that connection is a background worker.  The normal backend
+ * and the background session worker communicate over the normal FE/BE
+ * protocol.
+ *
+ * Types:
+ *
+ * BackgroundSession -- opaque connection handle
+ * BackgroundSessionPreparedStatement -- opaque prepared statement handle
+ * BackgroundSessionResult -- query result
+ *
+ * Functions:
+ *
+ * BackgroundSessionStart() -- start a session (launches background worker)
+ * and return a handle
+ *
+ * BackgroundSessionEnd() -- close session and free resources
+ *
+ * BackgroundSessionExecute() -- run SQL string and return result (rows or
+ * status)
+ *
+ * BackgroundSessionSend() -- run SQL string without waiting for result
+ *
+ * BackgroundSessionGetResult() -- get result from prior ...Send()
+ *
+ * BackgroundSessionPrepare() -- prepare an SQL string for subsequent
+ * execution
+ *
+ * BackgroundSessionExecutePrepared() -- run prepared statement
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/tupdesc.h"
+#include "access/xact.h"
+#include "commands/async.h"
+#include "commands/variable.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/bgsession.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define BGSESSION_MAGIC					0x50674267
+
+#define BGSESSION_KEY_FIXED_DATA		0
+#define BGSESSION_KEY_GUC				1
+#define BGSESSION_KEY_COMMAND_QUEUE		2
+#define BGSESSION_KEY_RESPONSE_QUEUE	3
+#define BGSESSION_NKEYS					4
+
+#define BGSESSION_QUEUE_SIZE			16384
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct background_session_fixed_data
+{
+	Oid database_id;
+	Oid authenticated_user_id;
+	Oid current_user_id;
+	int sec_context;
+} background_session_fixed_data;
+
+struct BackgroundSession
+{
+	ResourceOwner resowner;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *worker_handle;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	int		transaction_status;
+};
+
+struct BackgroundSessionPreparedStatement
+{
+	BackgroundSession *session;
+	Oid		   *argtypes;
+	TupleDesc	tupdesc;
+};
+
+static void bgsession_worker_main(Datum main_arg);
+static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg);
+static void bgsession_check_client_encoding_hook(void);
+static TupleDesc TupleDesc_from_RowDescription(StringInfo msg);
+static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg);
+static void forward_NotifyResponse(StringInfo msg);
+static void rethrow_errornotice(StringInfo msg);
+static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
+
+
+BackgroundSession *
+BackgroundSessionStart(void)
+{
+	ResourceOwner oldowner;
+	BackgroundWorker worker;
+	pid_t		pid;
+	BackgroundSession *session;
+	shm_toc_estimator e;
+	Size		segsize;
+	Size		guc_len;
+	char	   *gucstate;
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	BgwHandleStatus bgwstatus;
+	StringInfoData msg;
+	char		msgtype;
+
+	session = palloc(sizeof(*session));
+
+	session->resowner = ResourceOwnerCreate(NULL, "background session");
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(background_session_fixed_data));
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_keys(&e, BGSESSION_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	oldowner = CurrentResourceOwner;
+	PG_TRY();
+	{
+		CurrentResourceOwner = session->resowner;
+		seg = dsm_create(segsize, 0);
+	}
+	PG_CATCH();
+	{
+		CurrentResourceOwner = oldowner;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+	CurrentResourceOwner = oldowner;
+
+	session->seg = seg;
+
+	toc = shm_toc_create(BGSESSION_MAGIC, dsm_segment_address(seg), segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(*fdata));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	shm_toc_insert(toc, BGSESSION_KEY_FIXED_DATA, fdata);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, BGSESSION_KEY_GUC, gucstate);
+
+	command_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+							   BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_COMMAND_QUEUE, command_mq);
+	shm_mq_set_sender(command_mq, MyProc);
+
+	response_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+								BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq);
+	shm_mq_set_receiver(response_mq, MyProc);
+
+	session->command_qh = shm_mq_attach(command_mq, seg, NULL);
+	session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = bgsession_worker_main;
+	snprintf(worker.bgw_name, BGW_MAXLEN, "background session by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+				 errhint("You might need to increase max_worker_processes.")));
+
+	shm_mq_set_handle(session->command_qh, session->worker_handle);
+	shm_mq_set_handle(session->response_qh, session->worker_handle);
+
+	bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+	if (bgwstatus != BGWH_STARTED)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not start background worker")));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return session;
+}
+
+
+void
+BackgroundSessionEnd(BackgroundSession *session)
+{
+	StringInfoData msg;
+
+	if (session->transaction_status == 'T')
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("background session ended with transaction block open")));
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'X');
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	pfree(session->worker_handle);
+	dsm_detach(session->seg);
+	ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
+	ResourceOwnerDelete(session->resowner);
+	pfree(session);
+}
+
+
+void
+BackgroundSessionSend(BackgroundSession *session, const char *sql)
+{
+	StringInfoData msg;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'Q');
+	pq_sendstring(&msg, sql);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+}
+
+
+static void
+bgresult_store_tuple(BackgroundSessionResult *result, HeapTuple tuple)
+{
+	AssertArg(result);
+	Assert(result->ntuples <= result->alloced);
+
+	if (result->ntuples >= result->alloced)
+	{
+		uint64	newsize = (result->alloced > 0) ? result->alloced * 2 : 128;
+
+		if (result->tuples == NULL)
+			result->tuples = palloc(newsize * sizeof(*result->tuples));
+		else
+			result->tuples = repalloc(result->tuples, newsize * sizeof(*result->tuples));
+		result->alloced = newsize;
+	}
+
+	result->tuples[result->ntuples++] = tuple;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionGetResult(BackgroundSession *session)
+{
+	StringInfoData msg;
+	char		msgtype;
+	BackgroundSessionResult *result;
+
+	result = palloc0(sizeof(*result));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!result->tupdesc)
+					elog(ERROR, "no T before D");
+				bgresult_store_tuple(result, HeapTuple_from_DataRow(result->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return result;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecute(BackgroundSession *session, const char *sql)
+{
+	BackgroundSessionSend(session, sql);
+	return BackgroundSessionGetResult(session);
+}
+
+
+BackgroundSessionPreparedStatement *
+BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs,
+						 Oid argtypes[], const char *argnames[])
+{
+	BackgroundSessionPreparedStatement *result;
+	StringInfoData msg;
+	int			i;
+	char		msgtype;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'P');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, sql);
+	pq_sendint(&msg, nargs, 2);
+	for (i = 0; i < nargs; i++)
+		pq_sendint(&msg, argtypes[i], 4);
+	if (argnames)
+		for (i = 0; i < nargs; i++)
+			pq_sendstring(&msg, argnames[i]);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->session = session;
+	result->argtypes = palloc(nargs * sizeof(*result->argtypes));
+	memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes));
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '1':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'D');
+	pq_sendbyte(&msg, 'S');
+	pq_sendstring(&msg, "");
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'E':
+				rethrow_errornotice(&msg);
+				break;
+			case 'n':
+				break;
+			case 't':
+				/* ignore for now */
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'n' && msgtype != 'T');
+
+	return result;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[])
+{
+	BackgroundSession *session;
+	StringInfoData msg;
+	BackgroundSessionResult *result;
+	char		msgtype;
+	int			i;
+
+	session = stmt->session;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'B');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 1, 2);  /* number of parameter format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_sendint(&msg, nargs, 2);  /* number of parameter values */
+	for (i = 0; i < nargs; i++)
+	{
+		if (nulls[i])
+			pq_sendint(&msg, -1, 4);
+		else
+		{
+			Oid			typsend;
+			bool		typisvarlena;
+			bytea	   *outputbytes;
+
+			getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena);
+			outputbytes = OidSendFunctionCall(typsend, values[i]);
+			pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4);
+			pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ);
+			pfree(outputbytes);
+		}
+	}
+	pq_sendint(&msg, 1, 2);  /* number of result column format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '2':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'E');
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 0, 4);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->tupdesc = stmt->tupdesc;
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!stmt->tupdesc)
+					elog(ERROR, "did not expect any rows");
+				bgresult_store_tuple(result, HeapTuple_from_DataRow(stmt->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'C');
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_putemptymessage('S');
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case 'A':
+			forward_NotifyResponse(&msg);
+			break;
+		case 'Z':
+			session->transaction_status = pq_getmsgbyte(&msg);
+			pq_getmsgend(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	return result;
+}
+
+
+static void
+bgsession_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	char	   *gucstate;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	StringInfoData msg;
+	char		msgtype;
+
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "background session worker");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "background session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(BGSESSION_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, BGSESSION_KEY_FIXED_DATA);
+
+	gucstate = shm_toc_lookup(toc, BGSESSION_KEY_GUC);
+
+	command_mq = shm_toc_lookup(toc, BGSESSION_KEY_COMMAND_QUEUE);
+	shm_mq_set_receiver(command_mq, MyProc);
+	command_qh = shm_mq_attach(command_mq, seg, NULL);
+
+	response_mq = shm_toc_lookup(toc, BGSESSION_KEY_RESPONSE_QUEUE);
+	shm_mq_set_sender(response_mq, MyProc);
+	response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	pq_redirect_to_shm_mq(seg, response_qh);
+	BackgroundWorkerInitializeConnectionByOid(fdata->database_id,
+											  fdata->authenticated_user_id);
+
+	SetClientEncoding(GetDatabaseEncoding());
+
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	process_session_preload_libraries();
+
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	whereToSendOutput = DestRemote;
+	ReadyForQuery(whereToSendOutput);
+
+	MessageContext = AllocSetContextCreate(TopMemoryContext,
+										   "MessageContext",
+										   ALLOCSET_DEFAULT_MINSIZE,
+										   ALLOCSET_DEFAULT_INITSIZE,
+										   ALLOCSET_DEFAULT_MAXSIZE);
+
+	do
+	{
+		PG_TRY();
+		{
+			MemoryContextSwitchTo(MessageContext);
+			MemoryContextResetAndDeleteChildren(MessageContext);
+
+			ProcessCompletedNotifies();
+			pgstat_report_stat(false);
+			pgstat_report_activity(STATE_IDLE, NULL);
+
+			shm_mq_receive_stringinfo(command_qh, &msg);
+			msgtype = pq_getmsgbyte(&msg);
+
+			switch (msgtype)
+			{
+				case 'B':
+					{
+						SetCurrentStatementStartTimestamp();
+						exec_bind_message(&msg);
+						break;
+					}
+				case 'D':
+					{
+						int         describe_type;
+						const char *describe_target;
+
+						SetCurrentStatementStartTimestamp();
+
+						describe_type = pq_getmsgbyte(&msg);
+						describe_target = pq_getmsgstring(&msg);
+						pq_getmsgend(&msg);
+
+						switch (describe_type)
+						{
+							case 'S':
+								exec_describe_statement_message(describe_target);
+								break;
+#ifdef TODO
+							case 'P':
+								exec_describe_portal_message(describe_target);
+								break;
+#endif
+							default:
+								ereport(ERROR,
+										(errcode(ERRCODE_PROTOCOL_VIOLATION),
+										 errmsg("invalid DESCRIBE message subtype %d",
+												describe_type)));
+								break;
+						}
+					}
+					break;
+				case 'E':
+					{
+						const char *portal_name;
+						int			max_rows;
+
+						SetCurrentStatementStartTimestamp();
+
+						portal_name = pq_getmsgstring(&msg);
+						max_rows = pq_getmsgint(&msg, 4);
+						pq_getmsgend(&msg);
+
+						exec_execute_message(portal_name, max_rows);
+					}
+					break;
+
+				case 'P':
+					{
+						const char *stmt_name;
+						const char *query_string;
+						int			numParams;
+						Oid		   *paramTypes = NULL;
+
+						SetCurrentStatementStartTimestamp();
+
+						stmt_name = pq_getmsgstring(&msg);
+						query_string = pq_getmsgstring(&msg);
+						numParams = pq_getmsgint(&msg, 2);
+						if (numParams > 0)
+						{
+							int			i;
+
+							paramTypes = palloc(numParams * sizeof(Oid));
+							for (i = 0; i < numParams; i++)
+								paramTypes[i] = pq_getmsgint(&msg, 4);
+						}
+						pq_getmsgend(&msg);
+
+						exec_parse_message(query_string, stmt_name,
+										   paramTypes, numParams);
+						break;
+					}
+				case 'Q':
+					{
+						const char *sql;
+
+						sql = pq_getmsgstring(&msg);
+						pq_getmsgend(&msg);
+
+						check_client_encoding_hook = bgsession_check_client_encoding_hook;
+
+						SetCurrentStatementStartTimestamp();
+						exec_simple_query(sql, 1);
+
+						check_client_encoding_hook = NULL;
+
+						ReadyForQuery(whereToSendOutput);
+						break;
+					}
+				case 'S':
+					{
+						pq_getmsgend(&msg);
+						finish_xact_command();
+						ReadyForQuery(whereToSendOutput);
+						break;
+					}
+				case 'X':
+					break;
+				default:
+					ereport(ERROR,
+							(errcode(ERRCODE_PROTOCOL_VIOLATION),
+							 errmsg("invalid protocol message type from background session leader: %c",
+									msgtype)));
+					break;
+			}
+		}
+		PG_CATCH();
+		{
+			EmitErrorReport();
+			AbortCurrentTransaction();
+			MemoryContextSwitchTo(TopMemoryContext);
+			FlushErrorState();
+			reset_xact_command();
+		}
+		PG_END_TRY();
+	}
+	while (msgtype != 'X');
+}
+
+
+static void
+shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg)
+{
+	shm_mq_result res;
+	Size		nbytes;
+	void	   *data;
+
+	res = shm_mq_receive(qh, &nbytes, &data, false);
+	if (res != SHM_MQ_SUCCESS)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not read from message queue: %s", shm_mq_strerror(res))));
+
+	initStringInfo(msg);
+	appendBinaryStringInfo(msg, data, nbytes);
+}
+
+
+static void
+bgsession_check_client_encoding_hook(void)
+{
+	elog(ERROR, "cannot set client encoding in background session");
+}
+
+
+static TupleDesc
+TupleDesc_from_RowDescription(StringInfo msg)
+{
+	TupleDesc	tupdesc;
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+
+	tupdesc = CreateTemplateTupleDesc(natts, false);
+	for (i = 0; i < natts; i++)
+	{
+		const char *colname;
+		Oid     type_oid;
+		int32	typmod;
+		int16	format;
+
+		colname = pq_getmsgstring(msg);
+		(void) pq_getmsgint(msg, 4);   /* table OID */
+		(void) pq_getmsgint(msg, 2);   /* table attnum */
+		type_oid = pq_getmsgint(msg, 4);
+		(void) pq_getmsgint(msg, 2);   /* type length */
+		typmod = pq_getmsgint(msg, 4);
+		format = pq_getmsgint(msg, 2);
+		(void) format;
+#ifdef TODO
+		/* XXX The protocol sometimes sends 0 (text) if the format is not
+		 * determined yet.  We always use binary, so this check is probably
+		 * not useful. */
+		if (format != 1)
+			elog(ERROR, "format must be binary");
+#endif
+
+		TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0);
+	}
+	return tupdesc;
+}
+
+
+static HeapTuple
+HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg)
+{
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+	Datum	   *values;
+	bool	   *nulls;
+	StringInfoData buf;
+
+	Assert(tupdesc);
+
+	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+
+	values = palloc(natts * sizeof(*values));
+	nulls = palloc(natts * sizeof(*nulls));
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; i++)
+	{
+		int32 len = pq_getmsgint(msg, 4);
+
+		if (len < 0)
+			nulls[i] = true;
+		else
+		{
+			Oid recvid;
+			Oid typioparams;
+
+			nulls[i] = false;
+
+			getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid,
+								   &recvid,
+								   &typioparams);
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len);
+			values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams,
+											   tupdesc->attrs[i]->atttypmod);
+		}
+	}
+
+	return heap_form_tuple(tupdesc, values, nulls);
+}
+
+
+static void
+forward_NotifyResponse(StringInfo msg)
+{
+	int32	pid;
+	const char *channel;
+	const char *payload;
+
+	pid = pq_getmsgint(msg, 4);
+	channel = pq_getmsgrawstring(msg);
+	payload = pq_getmsgrawstring(msg);
+	pq_getmsgend(msg);
+
+	NotifyMyFrontEnd(channel, payload, pid);
+}
+
+
+static void
+rethrow_errornotice(StringInfo msg)
+{
+	ErrorData   edata;
+
+	pq_parse_errornotice(msg, &edata);
+	edata.elevel = Min(edata.elevel, ERROR);
+	ThrowErrorData(&edata);
+}
+
+
+static void
+invalid_protocol_message(char msgtype)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid protocol message type from background session: %c",
+					msgtype)));
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index b07d6c6cb9..2e5b601a5e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -180,8 +180,6 @@ static int	errdetail_execute(List *raw_parsetree_list);
 static int	errdetail_params(ParamListInfo params);
 static int	errdetail_abort(void);
 static int	errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
 static bool IsTransactionExitStmt(Node *parsetree);
 static bool IsTransactionExitStmtList(List *pstmts);
 static bool IsTransactionStmtList(List *pstmts);
@@ -876,8 +874,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
  *
  * Execute a "simple Query" protocol message.
  */
-static void
-exec_simple_query(const char *query_string)
+void
+exec_simple_query(const char *query_string, int16 format)
 {
 	CommandDest dest = whereToSendOutput;
 	MemoryContext oldcontext;
@@ -970,7 +968,6 @@ exec_simple_query(const char *query_string)
 				   *plantree_list;
 		Portal		portal;
 		DestReceiver *receiver;
-		int16		format;
 
 		/*
 		 * Get the command name for use in status display (it also becomes the
@@ -1061,6 +1058,8 @@ exec_simple_query(const char *query_string)
 		 */
 		PortalStart(portal, NULL, 0, InvalidSnapshot);
 
+		if (format < 0)
+		{
 		/*
 		 * Select the appropriate output format: text unless we are doing a
 		 * FETCH from a binary cursor.  (Pretty grotty to have to do this here
@@ -1081,6 +1080,7 @@ exec_simple_query(const char *query_string)
 					format = 1; /* BINARY */
 			}
 		}
+		}
 		PortalSetResultFormat(portal, 1, &format);
 
 		/*
@@ -1192,7 +1192,7 @@ exec_simple_query(const char *query_string)
  *
  * Execute a "Parse" protocol message.
  */
-static void
+void
 exec_parse_message(const char *query_string,	/* string to execute */
 				   const char *stmt_name,		/* name for prepared stmt */
 				   Oid *paramTypes,		/* parameter types */
@@ -1454,7 +1454,7 @@ exec_parse_message(const char *query_string,	/* string to execute */
  *
  * Process a "Bind" message to create a portal from a prepared statement
  */
-static void
+void
 exec_bind_message(StringInfo input_message)
 {
 	const char *portal_name;
@@ -1837,7 +1837,7 @@ exec_bind_message(StringInfo input_message)
  *
  * Process an "Execute" message for a portal
  */
-static void
+void
 exec_execute_message(const char *portal_name, long max_rows)
 {
 	CommandDest dest;
@@ -2064,6 +2064,9 @@ check_log_statement(List *stmt_list)
 {
 	ListCell   *stmt_item;
 
+	if (IsBackgroundWorker)
+		return false;
+
 	if (log_statement == LOGSTMT_NONE)
 		return false;
 	if (log_statement == LOGSTMT_ALL)
@@ -2099,6 +2102,9 @@ check_log_statement(List *stmt_list)
 int
 check_log_duration(char *msec_str, bool was_logged)
 {
+	if (IsBackgroundWorker)
+		return false;
+
 	if (log_duration || log_min_duration_statement >= 0)
 	{
 		long		secs;
@@ -2286,7 +2292,7 @@ errdetail_recovery_conflict(void)
  *
  * Process a "Describe" message for a prepared statement
  */
-static void
+void
 exec_describe_statement_message(const char *stmt_name)
 {
 	CachedPlanSource *psrc;
@@ -2430,7 +2436,7 @@ exec_describe_portal_message(const char *portal_name)
 /*
  * Convenience routines for starting/committing a single command.
  */
-static void
+void
 start_xact_command(void)
 {
 	if (!xact_started)
@@ -2448,7 +2454,7 @@ start_xact_command(void)
 	}
 }
 
-static void
+void
 finish_xact_command(void)
 {
 	if (xact_started)
@@ -2473,6 +2479,12 @@ finish_xact_command(void)
 	}
 }
 
+void
+reset_xact_command(void)
+{
+	xact_started = false;
+}
+
 
 /*
  * Convenience routines for checking whether a statement is one of the
@@ -3891,7 +3903,7 @@ PostgresMain(int argc, char *argv[],
 			ignore_till_sync = true;
 
 		/* We don't have a transaction command open anymore */
-		xact_started = false;
+		reset_xact_command();
 
 		/*
 		 * If an error occurred while we were reading a message from the
@@ -4063,7 +4075,7 @@ PostgresMain(int argc, char *argv[],
 					if (am_walsender)
 						exec_replication_command(query_string);
 					else
-						exec_simple_query(query_string);
+						exec_simple_query(query_string, -1);
 
 					send_ready_for_query = true;
 				}
diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h
index 247423c6fb..2a41d6e59c 100644
--- a/src/include/commands/variable.h
+++ b/src/include/commands/variable.h
@@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s
 extern bool check_random_seed(double *newval, void **extra, GucSource source);
 extern void assign_random_seed(double newval, void *extra);
 extern const char *show_random_seed(void);
+extern void (*check_client_encoding_hook)(void);
 extern bool check_client_encoding(char **newval, void **extra, GucSource source);
 extern void assign_client_encoding(const char *newval, void *extra);
 extern bool check_session_authorization(char **newval, void **extra, GucSource source);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index e356bd60f4..b5bc7abb5a 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
+extern void pq_stop_redirect_to_shm_mq(void);
 extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 7a37535ab3..9c1d54ef5d 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -79,6 +79,9 @@ extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
 
+/* Get error message string. */
+extern const char *shm_mq_strerror(shm_mq_result res);
+
 /* Smallest possible queue. */
 extern PGDLLIMPORT const Size shm_mq_minimum_size;
 
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
new file mode 100644
index 0000000000..744681ef79
--- /dev/null
+++ b/src/include/tcop/bgsession.h
@@ -0,0 +1,32 @@
+#ifndef BGSESSION_H
+#define BGSESSION_H
+
+#include "access/htup.h"
+#include "access/tupdesc.h"
+
+struct BackgroundSession;
+typedef struct BackgroundSession BackgroundSession;
+
+struct BackgroundSessionPreparedStatement;
+typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement;
+
+typedef struct BackgroundSessionResult
+{
+	TupleDesc	tupdesc;
+	HeapTuple  *tuples;
+	uint64		ntuples;
+	uint64		alloced;
+	const char *command;
+} BackgroundSessionResult;
+
+BackgroundSession *BackgroundSessionStart(void);
+void BackgroundSessionEnd(BackgroundSession *session);
+
+void BackgroundSessionSend(BackgroundSession *session, const char *sql);
+BackgroundSessionResult *BackgroundSessionGetResult(BackgroundSession *session);
+BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql);
+
+BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]);
+BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]);
+
+#endif /* BGSESSION_H */
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 1958be85b7..cea599274e 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -58,6 +58,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions,
 			  ParamListInfo boundParams);
 extern List *pg_plan_queries(List *querytrees, int cursorOptions,
 				ParamListInfo boundParams);
+extern void exec_simple_query(const char *query_string, int16 format);
+extern void exec_parse_message(const char *query_string, const char *stmt_name,
+							   Oid paramTypes[], int numParams);
+extern void exec_bind_message(StringInfo input_message);
+extern void exec_execute_message(const char *portal_name, long max_rows);
+extern void exec_describe_statement_message(const char *stmt_name);
 
 extern bool check_max_stack_depth(int *newval, void **extra, GucSource source);
 extern void assign_max_stack_depth(int newval, void *extra);
@@ -71,6 +77,10 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
 extern void ProcessClientReadInterrupt(bool blocked);
 extern void ProcessClientWriteInterrupt(bool blocked);
 
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
+extern void reset_xact_command(void);
+
 extern void process_postgres_switches(int argc, char *argv[],
 						  GucContext ctx, const char **dbname);
 extern void PostgresMain(int argc, char *argv[],
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index 7680d49cb6..9895a6e869 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language"
 NAME = plpython$(python_majorversion)
 
 OBJS = \
+	plpy_bgsession.o \
 	plpy_cursorobject.o \
 	plpy_elog.o \
 	plpy_exec.o \
@@ -89,6 +90,7 @@ REGRESS = \
 	plpython_quote \
 	plpython_composite \
 	plpython_subtransaction \
+	plpython_bgsession \
 	plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_bgsession.out b/src/pl/plpython/expected/plpython_bgsession.out
new file mode 100644
index 0000000000..5b32eab31b
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_bgsession.out
@@ -0,0 +1,217 @@
+CREATE TABLE test1 (a int, b text);
+-- test of independent commit/rollback
+CREATE FUNCTION bgsession_test1() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+$$;
+SELECT bgsession_test1();
+ bgsession_test1 
+-----------------
+                
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+-- test query result
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+$$;
+SELECT bgsession_test2();
+INFO:  <PLyResult status=0 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]>
+ bgsession_test2 
+-----------------
+                
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+-- test notice and error forwarding
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+$$;
+SELECT bgsession_test3();
+NOTICE:  notice
+ERROR:  plpy.Error: error
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "bgsession_test3", line 4, in <module>
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+PL/Python function "bgsession_test3"
+-- test prohibit changing client encoding
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+$$;
+SELECT bgsession_test4();
+ERROR:  plpy.Error: cannot set client encoding in background session
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "bgsession_test4", line 3, in <module>
+    a.execute("SET client_encoding TO SJIS")
+PL/Python function "bgsession_test4"
+-- test prepared statements
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+$$;
+TRUNCATE test1;
+SELECT bgsession_test5();
+ bgsession_test5 
+-----------------
+                
+(1 row)
+
+SELECT * FROM test1;
+ a |  b  
+---+-----
+ 1 | one
+ 2 | two
+(2 rows)
+
+-- test result from prepared query
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+$$;
+TRUNCATE test1;
+SELECT bgsession_test7();
+INFO:  <PLyResult status=0 nrows=1 rows=[{'a': 11, 'b': None}]>
+ bgsession_test7 
+-----------------
+                
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+-- test error when not closing transaction
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+$$;
+SELECT bgsession_test8();
+ERROR:  background session ended with transaction block open
+CONTEXT:  PL/Python function "bgsession_test8"
+-- test saving session
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+TRUNCATE test1;
+SELECT bgsession_test9a();
+ bgsession_test9a 
+------------------
+                1
+(1 row)
+
+SELECT bgsession_test9b();
+ bgsession_test9b 
+------------------
+                2
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 1 | 
+ 2 | 
+(2 rows)
+
+-- test error handling
+CREATE FUNCTION bgsession_test10() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as bg:
+    try:
+        bg.execute("SELECT error")
+    except Exception, ex:
+        plpy.notice("caught exception: %s" % ex)
+
+    try:
+        bg.prepare("SELECT error")
+    except Exception, ex:
+        plpy.notice("caught exception: %s" % ex)
+
+    try:
+        plan = bg.prepare("SELECT $1", ["int4"])
+        bg.execute_prepared(plan, ["foo"])
+    except plpy.Error, ex:
+        plpy.notice("caught exception: %s" % ex)
+$$;
+SELECT bgsession_test10();
+NOTICE:  caught exception: column "error" does not exist
+NOTICE:  caught exception: column "error" does not exist
+NOTICE:  caught exception: invalid input syntax for integer: "foo"
+ bgsession_test10 
+------------------
+                 
+(1 row)
+
+DROP TABLE test1;
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index 847e4cc412..2001b60fa4 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,8 +43,9 @@ contents.sort()
 return contents
 $$ LANGUAGE plpythonu;
 select module_contents();
- module_contents 
------------------
+  module_contents  
+-------------------
+ BackgroundSession
  Error
  Fatal
  SPIError
@@ -63,7 +64,7 @@ select module_contents();
  spiexceptions
  subtransaction
  warning
-(18 rows)
+(19 rows)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
 AS $$
diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c
new file mode 100644
index 0000000000..47d299f73c
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.c
@@ -0,0 +1,509 @@
+/*
+ * the PLyBackgroundSession class
+ *
+ * src/pl/plpython/plpy_bgsession.c
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "mb/pg_wchar.h"
+#include "parser/parse_type.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+#include "plpython.h"
+
+#include "plpy_bgsession.h"
+
+#include "plpy_elog.h"
+#include "plpy_main.h"
+#include "plpy_planobject.h"
+#include "plpy_spi.h"
+
+
+static PyObject *PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw);
+static void PLyBackgroundSession_dealloc(PyObject *subxact);
+static PyObject *PLyBackgroundSession_enter(PyObject *self, PyObject *unused);
+static PyObject *PLyBackgroundSession_close(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_prepare(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args);
+
+static char PLyBackgroundSession_doc[] = {
+	"PostgreSQL background session context manager"
+};
+
+static PyMethodDef PLyBackgroundSession_methods[] = {
+	{"close", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"__enter__", PLyBackgroundSession_enter, METH_VARARGS, NULL},
+	{"__exit__", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"execute", PLyBackgroundSession_execute, METH_VARARGS, NULL},
+	{"prepare", PLyBackgroundSession_prepare, METH_VARARGS, NULL},
+	{"execute_prepared", PLyBackgroundSession_execute_prepared, METH_VARARGS, NULL},
+	{NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLyBackgroundSession_Type = {
+	PyVarObject_HEAD_INIT(NULL, 0)
+	"plpy.BackgroundSession",		/* tp_name */
+	sizeof(PLyBackgroundSession_Object),	/* tp_size */
+	0,								/* tp_itemsize */
+
+	/*
+	 * methods
+	 */
+	PLyBackgroundSession_dealloc,	/* tp_dealloc */
+	0,								/* tp_print */
+	0,								/* tp_getattr */
+	0,								/* tp_setattr */
+	0,								/* tp_compare */
+	0,								/* tp_repr */
+	0,								/* tp_as_number */
+	0,								/* tp_as_sequence */
+	0,								/* tp_as_mapping */
+	0,								/* tp_hash */
+	0,								/* tp_call */
+	0,								/* tp_str */
+	0,								/* tp_getattro */
+	0,								/* tp_setattro */
+	0,								/* tp_as_buffer */
+	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,	/* tp_flags */
+	PLyBackgroundSession_doc,		/* tp_doc */
+	0,								/* tp_traverse */
+	0,								/* tp_clear */
+	0,								/* tp_richcompare */
+	0,								/* tp_weaklistoffset */
+	0,								/* tp_iter */
+	0,								/* tp_iternext */
+	PLyBackgroundSession_methods,	/* tp_tpmethods */
+	0,								/* tp_members */
+	0,								/* tp_getset */
+	0,								/* tp_base */
+	0,								/* tp_dict */
+	0,								/* tp_descr_get */
+	0,								/* tp_descr_set */
+	0,								/* tp_dictoffset */
+	0,								/* tp_init */
+	0,								/* tp_alloc */
+	PLyBackgroundSession_new,		/* tp_new */
+	0,								/* tp_free */
+};
+
+
+int
+PLy_bgsession_init_type(PyObject *module)
+{
+	if (PyType_Ready(&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	Py_INCREF(&PLyBackgroundSession_Type);
+	if (PyModule_AddObject(module, "BackgroundSession", (PyObject *)&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	return 0;
+}
+
+static PyObject *
+PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw)
+{
+	PyObject   *result = type->tp_alloc(type, 0);
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result;
+	MemoryContext oldcontext;
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	bgsession->bgsession = BackgroundSessionStart();
+	MemoryContextSwitchTo(oldcontext);
+
+	return result;
+}
+
+/*
+ * Python requires a dealloc function to be defined
+ */
+static void
+PLyBackgroundSession_dealloc(PyObject *self)
+{
+}
+
+/*
+ * bgsession.__enter__() or bgsession.enter()
+ */
+static PyObject *
+PLyBackgroundSession_enter(PyObject *self, PyObject *unused)
+{
+	Py_INCREF(self);
+	return self;
+}
+
+/*
+ * bgsession.close() or bgsession.__exit__(exc_type, exc, tb)
+ */
+static PyObject *
+PLyBackgroundSession_close(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	BackgroundSessionEnd(bgsession->bgsession);
+	bgsession->bgsession = NULL;
+
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+/*
+ * Use in PG_CATCH() clause to set Python exception from error data.
+ */
+static void
+PLy_bgsession_handle_error(MemoryContext oldcontext)
+{
+	ErrorData  *edata;
+
+	MemoryContextSwitchTo(oldcontext);
+	edata = CopyErrorData();
+	FlushErrorState();
+	PLy_exception_set_with_details(PLy_exc_error, edata);
+	FreeErrorData(edata);
+}
+
+static PyObject *
+PLyBackgroundSession_execute(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+	volatile MemoryContext oldcontext;
+	PyObject   *ret;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "s:execute", &query))
+	{
+		PLy_exception_set(PLy_exc_error, "background session execute expected a query");
+		return NULL;
+	}
+
+	oldcontext = CurrentMemoryContext;
+
+	PG_TRY();
+	{
+		BackgroundSessionResult *result;
+
+		pg_verifymbstr(query, strlen(query), false);
+		result = BackgroundSessionExecute(bgsession->bgsession, query);
+		ret = PLy_execute_fetch_result(result->tuples, result->tupdesc, result->ntuples, 0);
+	}
+	PG_CATCH();
+	{
+		PLy_bgsession_handle_error(oldcontext);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	return ret;
+}
+
+// XXX lots of overlap with PLy_spi_prepare
+static PyObject *
+PLyBackgroundSession_prepare(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+	PyObject   *paraminfo = NULL;
+	int			nargs = 0;
+	const char **argnames = NULL;
+	PyObject   *keys;
+	PLyPlanObject *plan;
+	volatile MemoryContext oldcontext;
+	PyObject   *volatile optr = NULL;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &paraminfo))
+		return NULL;
+
+	if (paraminfo &&
+		!PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of prepare must be a sequence or mapping");
+		return NULL;
+	}
+
+	if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
+		return NULL;
+
+	plan->mcxt = AllocSetContextCreate(TopMemoryContext,
+									   "PL/Python background plan context",
+									   ALLOCSET_DEFAULT_MINSIZE,
+									   ALLOCSET_DEFAULT_INITSIZE,
+									   ALLOCSET_DEFAULT_MAXSIZE);
+
+	oldcontext = MemoryContextSwitchTo(plan->mcxt);
+
+	if (!paraminfo)
+		nargs = 0;
+	else if (PySequence_Check(paraminfo))
+		nargs = PySequence_Length(paraminfo);
+	else
+		nargs = PyMapping_Length(paraminfo);
+
+	plan->nargs = nargs;
+	plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL;
+	plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL;
+	plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL;
+
+	if (PyMapping_Check(paraminfo))
+	{
+		argnames = palloc(nargs * sizeof(char *));
+		keys = PyMapping_Keys(paraminfo);
+	}
+	else
+	{
+		argnames = NULL;
+		keys = NULL;
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	oldcontext = CurrentMemoryContext;
+
+	PG_TRY();
+	{
+		int			i;
+		PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+
+		for (i = 0; i < nargs; i++)
+		{
+			PLy_typeinfo_init(&plan->args[i], plan->mcxt);
+			plan->values[i] = PointerGetDatum(NULL);
+		}
+
+		for (i = 0; i < nargs; i++)
+		{
+			char	   *sptr;
+			HeapTuple	typeTup;
+			Oid			typeId;
+			int32		typmod;
+
+			if (keys)
+			{
+				PyObject *key;
+				char *keystr;
+
+				key = PySequence_GetItem(keys, i);
+				argnames[i] = keystr = PyString_AsString(key);
+				optr = PyMapping_GetItemString(paraminfo, keystr);
+				Py_DECREF(key);
+			}
+			else
+				optr = PySequence_GetItem(paraminfo, i);
+
+			if (PyString_Check(optr))
+				sptr = PyString_AsString(optr);
+			else if (PyUnicode_Check(optr))
+				sptr = PLyUnicode_AsString(optr);
+			else
+			{
+				ereport(ERROR,
+						(errmsg("background session prepare: type name at ordinal position %d is not a string", i)));
+				sptr = NULL;	/* keep compiler quiet */
+			}
+
+			/********************************************************
+			 * Resolve argument type names and then look them up by
+			 * oid in the system cache, and remember the required
+			 *information for input conversion.
+			 ********************************************************/
+
+			parseTypeString(sptr, &typeId, &typmod, false);
+
+			typeTup = SearchSysCache1(TYPEOID,
+									  ObjectIdGetDatum(typeId));
+			if (!HeapTupleIsValid(typeTup))
+				elog(ERROR, "cache lookup failed for type %u", typeId);
+
+			Py_DECREF(optr);
+
+			/*
+			 * set optr to NULL, so we won't try to unref it again in case of
+			 * an error
+			 */
+			optr = NULL;
+
+			plan->types[i] = typeId;
+			PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes);
+			ReleaseSysCache(typeTup);
+		}
+
+		pg_verifymbstr(query, strlen(query), false);
+		plan->bgstmt = BackgroundSessionPrepare(bgsession->bgsession, query, nargs, plan->types, argnames);
+	}
+	PG_CATCH();
+	{
+		Py_DECREF(plan);
+		Py_XDECREF(optr);
+
+		PLy_bgsession_handle_error(oldcontext);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	Assert(plan->bgstmt != NULL);
+	return (PyObject *) plan;
+}
+
+static PyObject *
+PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession pg_attribute_unused() = (PLyBackgroundSession_Object *) self;
+	PyObject   *ob;
+	PLyPlanObject *plan;
+	PyObject   *list = NULL;
+	int			nargs;
+	volatile MemoryContext oldcontext;
+	int			i;
+	PyObject   *ret;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list))
+		return NULL;
+
+	if (!is_PLyPlanObject(ob))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "first argument of execute_prepared must be a plan");
+		return NULL;
+	}
+
+	plan = (PLyPlanObject *) ob;
+
+	if (list && (!PySequence_Check(list)))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of execute_prepared must be a sequence");
+		return NULL;
+	}
+
+	nargs = list ? PySequence_Length(list) : 0;
+
+	if (nargs != plan->nargs)
+	{
+		char	   *sv;
+		PyObject   *so = PyObject_Str(list);
+
+		if (!so)
+			PLy_elog(ERROR, "could not execute plan");
+		sv = PyString_AsString(so);
+		PLy_exception_set_plural(PyExc_TypeError,
+							  "Expected sequence of %d argument, got %d: %s",
+							 "Expected sequence of %d arguments, got %d: %s",
+								 plan->nargs,
+								 plan->nargs, nargs, sv);
+		Py_DECREF(so);
+
+		return NULL;
+	}
+
+	oldcontext = CurrentMemoryContext;
+
+	PG_TRY();
+	{
+		bool	   *nulls;
+		BackgroundSessionResult *result;
+
+		nulls = palloc(nargs * sizeof(*nulls));
+
+		for (i = 0; i < nargs; i++)
+		{
+			PyObject   *elem;
+
+			elem = PySequence_GetItem(list, i);
+			if (elem != Py_None)
+			{
+				PG_TRY();
+				{
+					plan->values[i] =
+						plan->args[i].out.d.func(&(plan->args[i].out.d),
+												 -1,
+												 elem,
+												 false);
+				}
+				PG_CATCH();
+				{
+					Py_DECREF(elem);
+					PG_RE_THROW();
+				}
+				PG_END_TRY();
+
+				Py_DECREF(elem);
+				nulls[i] = false;
+			}
+			else
+			{
+				Py_DECREF(elem);
+				plan->values[i] =
+					InputFunctionCall(&(plan->args[i].out.d.typfunc),
+									  NULL,
+									  plan->args[i].out.d.typioparam,
+									  -1);
+				nulls[i] = true;
+			}
+		}
+
+		result = BackgroundSessionExecutePrepared(plan->bgstmt, nargs, plan->values, nulls);
+		pfree(nulls);
+		ret = PLy_execute_fetch_result(result->tuples, result->tupdesc, result->ntuples, 0);
+	}
+	PG_CATCH();
+	{
+		int			k;
+
+		/*
+		 * cleanup plan->values array
+		 */
+		for (k = 0; k < nargs; k++)
+		{
+			if (!plan->args[k].out.d.typbyval &&
+				(plan->values[k] != PointerGetDatum(NULL)))
+			{
+				pfree(DatumGetPointer(plan->values[k]));
+				plan->values[k] = PointerGetDatum(NULL);
+			}
+		}
+
+		PLy_bgsession_handle_error(oldcontext);
+		return NULL;
+	}
+	PG_END_TRY();
+
+	for (i = 0; i < nargs; i++)
+	{
+		if (!plan->args[i].out.d.typbyval &&
+			(plan->values[i] != PointerGetDatum(NULL)))
+		{
+			pfree(DatumGetPointer(plan->values[i]));
+			plan->values[i] = PointerGetDatum(NULL);
+		}
+	}
+
+	return ret;
+}
diff --git a/src/pl/plpython/plpy_bgsession.h b/src/pl/plpython/plpy_bgsession.h
new file mode 100644
index 0000000000..39daca2d39
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.h
@@ -0,0 +1,18 @@
+/*
+ * src/pl/plpython/plpy_bgsession.h
+ */
+
+#ifndef PLPY_BGSESSION_H
+#define PLPY_BGSESSION_H
+
+#include "tcop/bgsession.h"
+
+typedef struct PLyBackgroundSession_Object
+{
+	PyObject_HEAD
+	BackgroundSession *bgsession;
+} PLyBackgroundSession_Object;
+
+extern int PLy_bgsession_init_type(PyObject *module);
+
+#endif   /* PLPY_BGSESSION_H */
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4323..b7ee34c3f2 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -7,6 +7,8 @@
 
 #include "plpy_procedure.h"
 
+#include "tcop/bgsession.h"
+
 /* the interpreter's globals dict */
 extern PyObject *PLy_interp_globals;
 
@@ -19,6 +21,7 @@ typedef struct PLyExecutionContext
 {
 	PLyProcedure *curr_proc;	/* the currently executing procedure */
 	MemoryContext scratch_ctx;	/* a context for things like type I/O */
+	BackgroundSession *bgsession;
 	struct PLyExecutionContext *next;	/* previous stack level */
 } PLyExecutionContext;
 
diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c
index 16c39a05dd..006a3afd80 100644
--- a/src/pl/plpython/plpy_planobject.c
+++ b/src/pl/plpython/plpy_planobject.c
@@ -77,6 +77,7 @@ PLy_plan_new(void)
 		return NULL;
 
 	ob->plan = NULL;
+	ob->bgstmt = NULL;
 	ob->nargs = 0;
 	ob->types = NULL;
 	ob->values = NULL;
diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h
index c67559266e..fba814ad8a 100644
--- a/src/pl/plpython/plpy_planobject.h
+++ b/src/pl/plpython/plpy_planobject.h
@@ -6,6 +6,7 @@
 #define PLPY_PLANOBJECT_H
 
 #include "executor/spi.h"
+#include "tcop/bgsession.h"
 #include "plpy_typeio.h"
 
 
@@ -13,6 +14,7 @@ typedef struct PLyPlanObject
 {
 	PyObject_HEAD
 	SPIPlanPtr	plan;
+	BackgroundSessionPreparedStatement *bgstmt;
 	int			nargs;
 	Oid		   *types;
 	Datum	   *values;
diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c
index 761534406d..500c0931f9 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -13,6 +13,7 @@
 
 #include "plpy_plpymodule.h"
 
+#include "plpy_bgsession.h"
 #include "plpy_cursorobject.h"
 #include "plpy_elog.h"
 #include "plpy_planobject.h"
@@ -137,6 +138,8 @@ PyInit_plpy(void)
 		return NULL;
 
 	PLy_add_exceptions(m);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		return NULL;
 
 	return m;
 }
@@ -167,6 +170,8 @@ PLy_init_plpy(void)
 #else
 	plpy = Py_InitModule("plpy", PLy_methods);
 	PLy_add_exceptions(plpy);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		PLy_elog(ERROR, "could not initialize BackgroundSession type");
 #endif
 
 	/* PyDict_SetItemString(plpy, "PlanType", (PyObject *) &PLy_PlanType); */
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index 07ab6a087e..97b4ba05f9 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -31,8 +31,6 @@
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
-							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
 
@@ -291,7 +289,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 
 		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
 							  exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		ret = PLy_execute_fetch_result(SPI_tuptable ? SPI_tuptable->vals : NULL,
+									   SPI_tuptable ? SPI_tuptable->tupdesc : NULL,
+									   SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -360,7 +361,10 @@ PLy_spi_execute_query(char *query, long limit)
 
 		pg_verifymbstr(query, strlen(query), false);
 		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
-		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		ret = PLy_execute_fetch_result(SPI_tuptable ? SPI_tuptable->vals : NULL,
+									   SPI_tuptable ? SPI_tuptable->tupdesc : 0,
+									   SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
@@ -383,8 +387,8 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
-PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
+PyObject *
+PLy_execute_fetch_result(HeapTuple *tuples, TupleDesc tupdesc, uint64 rows, int status)
 {
 	PLyResultObject *result;
 	volatile MemoryContext oldcontext;
@@ -393,14 +397,14 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 	Py_DECREF(result->status);
 	result->status = PyInt_FromLong(status);
 
-	if (status > 0 && tuptable == NULL)
+	if (status >= 0 && tuples == NULL)
 	{
 		Py_DECREF(result->nrows);
 		result->nrows = (rows > (uint64) LONG_MAX) ?
 			PyFloat_FromDouble((double) rows) :
 			PyInt_FromLong((long) rows);
 	}
-	else if (status > 0 && tuptable != NULL)
+	else if (status >= 0 && tuples != NULL)
 	{
 		PLyTypeInfo args;
 		MemoryContext cxt;
@@ -437,12 +441,12 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 				Py_DECREF(result->rows);
 				result->rows = PyList_New(rows);
 
-				PLy_input_tuple_funcs(&args, tuptable->tupdesc);
+				PLy_input_tuple_funcs(&args, tupdesc);
 				for (i = 0; i < rows; i++)
 				{
 					PyObject   *row = PLyDict_FromTuple(&args,
-														tuptable->vals[i],
-														tuptable->tupdesc);
+														tuples[i],
+														tupdesc);
 
 					PyList_SetItem(result->rows, i, row);
 				}
@@ -457,7 +461,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 			 * leaked due to errors.)
 			 */
 			oldcontext2 = MemoryContextSwitchTo(TopMemoryContext);
-			result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc);
+			result->tupdesc = CreateTupleDescCopy(tupdesc);
 			MemoryContextSwitchTo(oldcontext2);
 		}
 		PG_CATCH();
@@ -470,7 +474,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 		PG_END_TRY();
 
 		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
 	}
 
 	return (PyObject *) result;
diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h
index b0427947ef..268c539838 100644
--- a/src/pl/plpython/plpy_spi.h
+++ b/src/pl/plpython/plpy_spi.h
@@ -5,12 +5,15 @@
 #ifndef PLPY_SPI_H
 #define PLPY_SPI_H
 
+#include "executor/spi.h"
 #include "utils/palloc.h"
 #include "utils/resowner.h"
 
 extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args);
 extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args);
 
+extern PyObject *PLy_execute_fetch_result(HeapTuple *tuples, TupleDesc tupdesc, uint64 rows, int status);
+
 typedef struct PLyExceptionEntry
 {
 	int			sqlstate;		/* hash key, must be first */
diff --git a/src/pl/plpython/sql/plpython_bgsession.sql b/src/pl/plpython/sql/plpython_bgsession.sql
new file mode 100644
index 0000000000..7a801ab727
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_bgsession.sql
@@ -0,0 +1,177 @@
+CREATE TABLE test1 (a int, b text);
+
+
+-- test of independent commit/rollback
+
+CREATE FUNCTION bgsession_test1() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+$$;
+
+SELECT bgsession_test1();
+
+SELECT * FROM test1;
+
+
+-- test query result
+
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+$$;
+
+SELECT bgsession_test2();
+
+SELECT * FROM test1;
+
+
+-- test notice and error forwarding
+
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+$$;
+
+SELECT bgsession_test3();
+
+
+-- test prohibit changing client encoding
+
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+$$;
+
+SELECT bgsession_test4();
+
+
+-- test prepared statements
+
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+$$;
+
+TRUNCATE test1;
+
+SELECT bgsession_test5();
+
+SELECT * FROM test1;
+
+
+-- test result from prepared query
+
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+$$;
+
+TRUNCATE test1;
+
+SELECT bgsession_test7();
+
+SELECT * FROM test1;
+
+
+-- test error when not closing transaction
+
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+$$;
+
+SELECT bgsession_test8();
+
+
+-- test saving session
+
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+
+TRUNCATE test1;
+
+SELECT bgsession_test9a();
+SELECT bgsession_test9b();
+
+SELECT * FROM test1;
+
+
+-- test error handling
+
+CREATE FUNCTION bgsession_test10() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as bg:
+    try:
+        bg.execute("SELECT error")
+    except Exception, ex:
+        plpy.notice("caught exception: %s" % ex)
+
+    try:
+        bg.prepare("SELECT error")
+    except Exception, ex:
+        plpy.notice("caught exception: %s" % ex)
+
+    try:
+        plan = bg.prepare("SELECT $1", ["int4"])
+        bg.execute_prepared(plan, ["foo"])
+    except plpy.Error, ex:
+        plpy.notice("caught exception: %s" % ex)
+$$;
+
+SELECT bgsession_test10();
+
+
+DROP TABLE test1;
-- 
2.12.0

>From 2e1fc0b0c50452bac91461a2317c28a8718fe89f Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Sun, 25 Dec 2016 12:00:00 -0500
Subject: [PATCH v2 1/2] dblink: Replace some macros by static functions

Also remove some unused code and the no longer useful dblink.h file.
---
 contrib/dblink/dblink.c | 290 +++++++++++++++++++++++-------------------------
 contrib/dblink/dblink.h |  39 -------
 2 files changed, 137 insertions(+), 192 deletions(-)
 delete mode 100644 contrib/dblink/dblink.h

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index e0d6778a08..67d6699066 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -61,8 +61,6 @@
 #include "utils/tqual.h"
 #include "utils/varlena.h"
 
-#include "dblink.h"
-
 PG_MODULE_MAGIC;
 
 typedef struct remoteConn
@@ -146,98 +144,102 @@ typedef struct remoteConnHashEnt
 /* initial number of connection hashes */
 #define NUMCONN 16
 
-/* general utility */
-#define xpfree(var_) \
-	do { \
-		if (var_ != NULL) \
-		{ \
-			pfree(var_); \
-			var_ = NULL; \
-		} \
-	} while (0)
-
-#define xpstrdup(var_c, var_) \
-	do { \
-		if (var_ != NULL) \
-			var_c = pstrdup(var_); \
-		else \
-			var_c = NULL; \
-	} while (0)
-
-#define DBLINK_RES_INTERNALERROR(p2) \
-	do { \
-			msg = pchomp(PQerrorMessage(conn)); \
-			if (res) \
-				PQclear(res); \
-			elog(ERROR, "%s: %s", p2, msg); \
-	} while (0)
-
-#define DBLINK_CONN_NOT_AVAIL \
-	do { \
-		if(conname) \
-			ereport(ERROR, \
-					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
-					 errmsg("connection \"%s\" not available", conname))); \
-		else \
-			ereport(ERROR, \
-					(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \
-					 errmsg("connection not available"))); \
-	} while (0)
-
-#define DBLINK_GET_CONN \
-	do { \
-			char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
-			rconn = getConnectionByName(conname_or_str); \
-			if (rconn) \
-			{ \
-				conn = rconn->conn; \
-				conname = conname_or_str; \
-			} \
-			else \
-			{ \
-				connstr = get_connect_string(conname_or_str); \
-				if (connstr == NULL) \
-				{ \
-					connstr = conname_or_str; \
-				} \
-				dblink_connstr_check(connstr); \
-				conn = PQconnectdb(connstr); \
-				if (PQstatus(conn) == CONNECTION_BAD) \
-				{ \
-					msg = pchomp(PQerrorMessage(conn)); \
-					PQfinish(conn); \
-					ereport(ERROR, \
-							(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \
-							 errmsg("could not establish connection"), \
-							 errdetail_internal("%s", msg))); \
-				} \
-				dblink_security_check(conn, rconn); \
-				if (PQclientEncoding(conn) != GetDatabaseEncoding()) \
-					PQsetClientEncoding(conn, GetDatabaseEncodingName()); \
-				freeconn = true; \
-			} \
-	} while (0)
-
-#define DBLINK_GET_NAMED_CONN \
-	do { \
-			conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \
-			rconn = getConnectionByName(conname); \
-			if (rconn) \
-				conn = rconn->conn; \
-			else \
-				DBLINK_CONN_NOT_AVAIL; \
-	} while (0)
-
-#define DBLINK_INIT \
-	do { \
-			if (!pconn) \
-			{ \
-				pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
-				pconn->conn = NULL; \
-				pconn->openCursorCount = 0; \
-				pconn->newXactForCursor = FALSE; \
-			} \
-	} while (0)
+static char *
+xpstrdup(const char *in)
+{
+	if (in == NULL)
+		return NULL;
+	return pstrdup(in);
+}
+
+static void
+dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
+{
+	char	   *msg = pchomp(PQerrorMessage(conn));
+	if (res)
+		PQclear(res);
+	elog(ERROR, "%s: %s", p2, msg);
+}
+
+static void pg_attribute_noreturn()
+dblink_conn_not_avail(const char *conname)
+{
+	if (conname)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("connection \"%s\" not available", conname)));
+	else
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
+				 errmsg("connection not available")));
+}
+
+static void
+dblink_get_conn(char *conname_or_str,
+				PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
+{
+	remoteConn *rconn = getConnectionByName(conname_or_str);
+	PGconn	   *conn;
+	char	   *conname;
+	bool		freeconn;
+
+	if (rconn)
+	{
+		conn = rconn->conn;
+		conname = conname_or_str;
+		freeconn = false;
+	}
+	else
+	{
+		const char *connstr;
+
+		connstr = get_connect_string(conname_or_str);
+		if (connstr == NULL)
+			connstr = conname_or_str;
+		dblink_connstr_check(connstr);
+		conn = PQconnectdb(connstr);
+		if (PQstatus(conn) == CONNECTION_BAD)
+		{
+			char	   *msg = pchomp(PQerrorMessage(conn));
+			PQfinish(conn);
+			ereport(ERROR,
+					(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+					 errmsg("could not establish connection"),
+					 errdetail_internal("%s", msg)));
+		}
+		dblink_security_check(conn, rconn);
+		if (PQclientEncoding(conn) != GetDatabaseEncoding())
+			PQsetClientEncoding(conn, GetDatabaseEncodingName());
+		freeconn = true;
+		conname = NULL;
+	}
+
+	*conn_p = conn;
+	*conname_p = conname;
+	*freeconn_p = freeconn;
+}
+
+static PGconn *
+dblink_get_named_conn(const char *conname)
+{
+	remoteConn *rconn = getConnectionByName(conname);
+	if (rconn)
+		return rconn->conn;
+	else
+		dblink_conn_not_avail(conname);
+}
+
+static void
+dblink_init(void)
+{
+	if (!pconn)
+	{
+		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
+		pconn->conn = NULL;
+		pconn->openCursorCount = 0;
+		pconn->newXactForCursor = FALSE;
+	}
+}
 
 /*
  * Create a persistent connection to another database
@@ -253,7 +255,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	PGconn	   *conn = NULL;
 	remoteConn *rconn = NULL;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 2)
 	{
@@ -318,7 +320,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	PGconn	   *conn = NULL;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 1)
 	{
@@ -331,7 +333,7 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 		conn = pconn->conn;
 
 	if (!conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 
 	PQfinish(conn);
 	if (rconn)
@@ -352,7 +354,6 @@ PG_FUNCTION_INFO_V1(dblink_open);
 Datum
 dblink_open(PG_FUNCTION_ARGS)
 {
-	char	   *msg;
 	PGresult   *res = NULL;
 	PGconn	   *conn = NULL;
 	char	   *curname = NULL;
@@ -362,7 +363,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	DBLINK_INIT;
+	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 2)
@@ -401,7 +402,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	}
 
 	if (!rconn || !rconn->conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 	else
 		conn = rconn->conn;
 
@@ -410,7 +411,7 @@ dblink_open(PG_FUNCTION_ARGS)
 	{
 		res = PQexec(conn, "BEGIN");
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
-			DBLINK_RES_INTERNALERROR("begin error");
+			dblink_res_internalerror(conn, res, "begin error");
 		PQclear(res);
 		rconn->newXactForCursor = TRUE;
 
@@ -450,11 +451,10 @@ dblink_close(PG_FUNCTION_ARGS)
 	char	   *curname = NULL;
 	char	   *conname = NULL;
 	StringInfoData buf;
-	char	   *msg;
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	DBLINK_INIT;
+	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 1)
@@ -489,7 +489,7 @@ dblink_close(PG_FUNCTION_ARGS)
 	}
 
 	if (!rconn || !rconn->conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 	else
 		conn = rconn->conn;
 
@@ -517,7 +517,7 @@ dblink_close(PG_FUNCTION_ARGS)
 
 			res = PQexec(conn, "COMMIT");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
-				DBLINK_RES_INTERNALERROR("commit error");
+				dblink_res_internalerror(conn, res, "commit error");
 			PQclear(res);
 		}
 	}
@@ -543,7 +543,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 
 	if (PG_NARGS() == 4)
 	{
@@ -587,7 +587,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 	}
 
 	if (!conn)
-		DBLINK_CONN_NOT_AVAIL;
+		dblink_conn_not_avail(conname);
 
 	initStringInfo(&buf);
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
@@ -633,15 +633,13 @@ PG_FUNCTION_INFO_V1(dblink_send_query);
 Datum
 dblink_send_query(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	char	   *sql = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
+	char	   *sql;
 	int			retval;
 
 	if (PG_NARGS() == 2)
 	{
-		DBLINK_GET_NAMED_CONN;
+		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	}
 	else
@@ -671,15 +669,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 
 	PG_TRY();
 	{
-		char	   *msg;
-		char	   *connstr = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
 		bool		fail = true;	/* default to backward compatible */
 
 		if (!is_async)
@@ -687,7 +682,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 3)
 			{
 				/* text,text,bool */
-				DBLINK_GET_CONN;
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				fail = PG_GETARG_BOOL(2);
 			}
@@ -702,7 +697,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 				}
 				else
 				{
-					DBLINK_GET_CONN;
+					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				}
 			}
@@ -722,13 +717,13 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 2)
 			{
 				/* text,bool */
-				DBLINK_GET_NAMED_CONN;
+				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				DBLINK_GET_NAMED_CONN;
+				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 			}
 			else
 				/* shouldn't happen */
@@ -736,7 +731,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 		}
 
 		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
+			dblink_conn_not_avail(conname);
 
 		if (!is_async)
 		{
@@ -1297,12 +1292,10 @@ PG_FUNCTION_INFO_V1(dblink_is_busy);
 Datum
 dblink_is_busy(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
 	PQconsumeInput(conn);
 	PG_RETURN_INT32(PQisBusy(conn));
@@ -1323,15 +1316,13 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query);
 Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
-	int			res = 0;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	int			res;
+	PGconn	   *conn;
 	PGcancel   *cancel;
 	char		errbuf[256];
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	cancel = PQgetCancel(conn);
 
 	res = PQcancel(cancel, errbuf, 256);
@@ -1359,12 +1350,10 @@ Datum
 dblink_error_message(PG_FUNCTION_ARGS)
 {
 	char	   *msg;
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 
-	DBLINK_INIT;
-	DBLINK_GET_NAMED_CONN;
+	dblink_init();
+	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
 	msg = PQerrorMessage(conn);
 	if (msg == NULL || msg[0] == '\0')
@@ -1384,22 +1373,19 @@ dblink_exec(PG_FUNCTION_ARGS)
 	PGconn	   *volatile conn = NULL;
 	volatile bool freeconn = false;
 
-	DBLINK_INIT;
+	dblink_init();
 
 	PG_TRY();
 	{
-		char	   *msg;
 		PGresult   *res = NULL;
-		char	   *connstr = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
-		remoteConn *rconn = NULL;
 		bool		fail = true;	/* default to backward compatible behavior */
 
 		if (PG_NARGS() == 3)
 		{
 			/* must be text,text,bool */
-			DBLINK_GET_CONN;
+			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			fail = PG_GETARG_BOOL(2);
 		}
@@ -1414,7 +1400,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 			}
 			else
 			{
-				DBLINK_GET_CONN;
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
 		}
@@ -1429,7 +1415,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 			elog(ERROR, "wrong number of arguments");
 
 		if (!conn)
-			DBLINK_CONN_NOT_AVAIL;
+			dblink_conn_not_avail(conname);
 
 		res = PQexec(conn, sql);
 		if (!res ||
@@ -1880,9 +1866,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify);
 Datum
 dblink_get_notify(PG_FUNCTION_ARGS)
 {
-	char	   *conname = NULL;
-	PGconn	   *conn = NULL;
-	remoteConn *rconn = NULL;
+	PGconn	   *conn;
 	PGnotify   *notify;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
@@ -1892,9 +1876,9 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	DBLINK_INIT;
+	dblink_init();
 	if (PG_NARGS() == 1)
-		DBLINK_GET_NAMED_CONN;
+		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	else
 		conn = pconn->conn;
 
@@ -2698,10 +2682,10 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 	else
 		sqlstate = ERRCODE_CONNECTION_FAILURE;
 
-	xpstrdup(message_primary, pg_diag_message_primary);
-	xpstrdup(message_detail, pg_diag_message_detail);
-	xpstrdup(message_hint, pg_diag_message_hint);
-	xpstrdup(message_context, pg_diag_context);
+	message_primary = xpstrdup(pg_diag_message_primary);
+	message_detail = xpstrdup(pg_diag_message_detail);
+	message_hint = xpstrdup(pg_diag_message_hint);
+	message_context = xpstrdup(pg_diag_context);
 
 	/*
 	 * If we don't get a message from the PGresult, try the PGconn.  This
diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h
deleted file mode 100644
index c96dbe28a8..0000000000
--- a/contrib/dblink/dblink.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * dblink.h
- *
- * Functions returning results from a remote database
- *
- * Joe Conway <m...@joeconway.com>
- * And contributors:
- * Darko Prenosil <darko.preno...@finteh.hr>
- * Shridhar Daithankar <shridhar_daithan...@persistent.co.in>
- *
- * contrib/dblink/dblink.h
- * Copyright (c) 2001-2017, PostgreSQL Global Development Group
- * ALL RIGHTS RESERVED;
- *
- * Permission to use, copy, modify, and distribute this software and its
- * documentation for any purpose, without fee, and without a written agreement
- * is hereby granted, provided that the above copyright notice and this
- * paragraph and the following two paragraphs appear in all copies.
- *
- * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
- * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
- * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
- * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
- * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
- * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
- * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
- * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
- *
- */
-
-#ifndef DBLINK_H
-#define DBLINK_H
-
-#include "fmgr.h"
-
-#endif   /* DBLINK_H */
-- 
2.12.0

>From 16f77685f7701790191b4f8d9a3cbce60604d397 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Tue, 28 Feb 2017 14:58:07 -0500
Subject: [PATCH v2 2/2] dblink: Add background sessions support

Add function dblink_connect_self(), which creates a dblink connection
via the background session API, instead of libpq.  After that, it can be
used transparently as a dblink connection, with some restrictions.

This allows the typical use case of using dblink for self-connections
without having to deal with access control issues of a libpq connection.
---
 contrib/dblink/Makefile             |   1 +
 contrib/dblink/dblink--1.2--1.3.sql |  14 +
 contrib/dblink/dblink.c             | 557 ++++++++++++++++++++++--------------
 contrib/dblink/dblink.control       |   2 +-
 contrib/dblink/expected/dblink.out  |  95 ++++++
 contrib/dblink/sql/dblink.sql       |  20 ++
 doc/src/sgml/dblink.sgml            |  37 +++
 7 files changed, 516 insertions(+), 210 deletions(-)
 create mode 100644 contrib/dblink/dblink--1.2--1.3.sql

diff --git a/contrib/dblink/Makefile b/contrib/dblink/Makefile
index 5189758dab..106039da3b 100644
--- a/contrib/dblink/Makefile
+++ b/contrib/dblink/Makefile
@@ -7,6 +7,7 @@ SHLIB_LINK = $(libpq)
 
 EXTENSION = dblink
 DATA = dblink--1.2.sql dblink--1.1--1.2.sql dblink--1.0--1.1.sql \
+	dblink--1.2--1.3.sql \
 	dblink--unpackaged--1.0.sql
 PGFILEDESC = "dblink - connect to other PostgreSQL databases"
 
diff --git a/contrib/dblink/dblink--1.2--1.3.sql b/contrib/dblink/dblink--1.2--1.3.sql
new file mode 100644
index 0000000000..b863613a3c
--- /dev/null
+++ b/contrib/dblink/dblink--1.2--1.3.sql
@@ -0,0 +1,14 @@
+/* contrib/dblink/dblink--1.2--1.3.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION dblink UPDATE TO '1.3'" to load this file. \quit
+
+CREATE FUNCTION dblink_connect_self ()
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_connect_self'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+CREATE FUNCTION dblink_connect_self (text)
+RETURNS text
+AS 'MODULE_PATHNAME','dblink_connect_self'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 67d6699066..6cb1a1e2e6 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -51,6 +51,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "parser/scansup.h"
+#include "tcop/bgsession.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -66,6 +67,7 @@ PG_MODULE_MAGIC;
 typedef struct remoteConn
 {
 	PGconn	   *conn;			/* Hold the remote connection */
+	BackgroundSession *bgconn;
 	int			openCursorCount;	/* The number of open cursors */
 	bool		newXactForCursor;		/* Opened a transaction for a cursor */
 } remoteConn;
@@ -89,6 +91,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
 static void prepTuplestoreResult(FunctionCallInfo fcinfo);
 static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
 				  PGresult *res);
+static void materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res);
 static void materializeQueryResult(FunctionCallInfo fcinfo,
 					   PGconn *conn,
 					   const char *conname,
@@ -111,7 +114,7 @@ static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumat
 static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
 static char *generate_relation_name(Relation rel);
 static void dblink_connstr_check(const char *connstr);
-static void dblink_security_check(PGconn *conn, remoteConn *rconn);
+static void dblink_security_check(remoteConn *rconn);
 static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
 							 const char *dblink_context_msg, bool fail);
 static char *get_connect_string(const char *servername);
@@ -176,22 +179,21 @@ dblink_conn_not_avail(const char *conname)
 
 static void
 dblink_get_conn(char *conname_or_str,
-				PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
+				remoteConn * volatile *rconn_p, char **conname_p, volatile bool *freeconn_p)
 {
 	remoteConn *rconn = getConnectionByName(conname_or_str);
-	PGconn	   *conn;
 	char	   *conname;
 	bool		freeconn;
 
 	if (rconn)
 	{
-		conn = rconn->conn;
 		conname = conname_or_str;
 		freeconn = false;
 	}
 	else
 	{
 		const char *connstr;
+		PGconn	   *conn;
 
 		connstr = get_connect_string(conname_or_str);
 		if (connstr == NULL)
@@ -207,38 +209,52 @@ dblink_get_conn(char *conname_or_str,
 					 errmsg("could not establish connection"),
 					 errdetail_internal("%s", msg)));
 		}
-		dblink_security_check(conn, rconn);
 		if (PQclientEncoding(conn) != GetDatabaseEncoding())
 			PQsetClientEncoding(conn, GetDatabaseEncodingName());
+		rconn = palloc0(sizeof(*rconn));
+		rconn->conn = conn;
 		freeconn = true;
 		conname = NULL;
+		dblink_security_check(rconn);
 	}
 
-	*conn_p = conn;
+	*rconn_p = rconn;
 	*conname_p = conname;
 	*freeconn_p = freeconn;
 }
 
-static PGconn *
+static void
+dblink_finish_conn(remoteConn *rconn)
+{
+	if (rconn->conn)
+	{
+		PQfinish(rconn->conn);
+		rconn->conn = NULL;
+	}
+	if (rconn->bgconn)
+	{
+		BackgroundSessionEnd(rconn->bgconn);
+		rconn->bgconn = NULL;
+	}
+}
+
+static remoteConn *
 dblink_get_named_conn(const char *conname)
 {
 	remoteConn *rconn = getConnectionByName(conname);
 	if (rconn)
-		return rconn->conn;
+		return rconn;
 	else
 		dblink_conn_not_avail(conname);
 }
 
 static void
-dblink_init(void)
+dblink_bgsession_not_supported(const remoteConn *rconn)
 {
-	if (!pconn)
-	{
-		pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
-		pconn->conn = NULL;
-		pconn->openCursorCount = 0;
-		pconn->newXactForCursor = FALSE;
-	}
+	if (!rconn->conn)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("not supported with background session connection")));
 }
 
 /*
@@ -255,8 +271,6 @@ dblink_connect(PG_FUNCTION_ARGS)
 	PGconn	   *conn = NULL;
 	remoteConn *rconn = NULL;
 
-	dblink_init();
-
 	if (PG_NARGS() == 2)
 	{
 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
@@ -265,8 +279,7 @@ dblink_connect(PG_FUNCTION_ARGS)
 	else if (PG_NARGS() == 1)
 		conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
 
-	if (connname)
-		rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
+	rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext,
 												  sizeof(remoteConn));
 
 	/* first check for valid foreign data server */
@@ -292,19 +305,46 @@ dblink_connect(PG_FUNCTION_ARGS)
 	}
 
 	/* check password actually used if not superuser */
-	dblink_security_check(conn, rconn);
+	dblink_security_check(rconn);
 
 	/* attempt to set client encoding to match server encoding, if needed */
 	if (PQclientEncoding(conn) != GetDatabaseEncoding())
 		PQsetClientEncoding(conn, GetDatabaseEncodingName());
 
+	rconn->conn = conn;
+
 	if (connname)
-	{
-		rconn->conn = conn;
 		createNewConnection(connname, rconn);
-	}
 	else
-		pconn->conn = conn;
+		pconn = rconn;
+
+	PG_RETURN_TEXT_P(cstring_to_text("OK"));
+}
+
+PG_FUNCTION_INFO_V1(dblink_connect_self);
+Datum
+dblink_connect_self(PG_FUNCTION_ARGS)
+{
+	char	   *connname = NULL;
+	BackgroundSession *conn = NULL;
+	remoteConn *rconn = NULL;
+	MemoryContext oldcontext;
+
+	if (PG_NARGS() == 1)
+		connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+	conn = BackgroundSessionStart();
+	MemoryContextSwitchTo(oldcontext);
+
+	rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext,
+												  sizeof(remoteConn));
+	rconn->bgconn = conn;
+
+	if (connname)
+		createNewConnection(connname, rconn);
+	else
+		pconn = rconn;
 
 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
 }
@@ -318,31 +358,25 @@ dblink_disconnect(PG_FUNCTION_ARGS)
 {
 	char	   *conname = NULL;
 	remoteConn *rconn = NULL;
-	PGconn	   *conn = NULL;
-
-	dblink_init();
 
 	if (PG_NARGS() == 1)
 	{
 		conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 	else
-		conn = pconn->conn;
+		rconn = pconn;
 
-	if (!conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
 
-	PQfinish(conn);
-	if (rconn)
-	{
+	dblink_finish_conn(rconn);
+	pfree(rconn);
+
+	if (conname)
 		deleteConnection(conname);
-		pfree(rconn);
-	}
 	else
-		pconn->conn = NULL;
+		pconn = NULL;
 
 	PG_RETURN_TEXT_P(cstring_to_text("OK"));
 }
@@ -363,7 +397,6 @@ dblink_open(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 2)
@@ -401,10 +434,12 @@ dblink_open(PG_FUNCTION_ARGS)
 		rconn = getConnectionByName(conname);
 	}
 
-	if (!rconn || !rconn->conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
-	else
-		conn = rconn->conn;
+
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
 
 	/* If we are not in a transaction, start one */
 	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
@@ -454,7 +489,6 @@ dblink_close(PG_FUNCTION_ARGS)
 	remoteConn *rconn = NULL;
 	bool		fail = true;	/* default to backward compatible behavior */
 
-	dblink_init();
 	initStringInfo(&buf);
 
 	if (PG_NARGS() == 1)
@@ -488,10 +522,12 @@ dblink_close(PG_FUNCTION_ARGS)
 		rconn = getConnectionByName(conname);
 	}
 
-	if (!rconn || !rconn->conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
-	else
-		conn = rconn->conn;
+
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
 
 	appendStringInfo(&buf, "CLOSE %s", curname);
 
@@ -543,8 +579,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
-
 	if (PG_NARGS() == 4)
 	{
 		/* text,text,int,bool */
@@ -554,8 +588,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		fail = PG_GETARG_BOOL(3);
 
 		rconn = getConnectionByName(conname);
-		if (rconn)
-			conn = rconn->conn;
 	}
 	else if (PG_NARGS() == 3)
 	{
@@ -565,7 +597,7 @@ dblink_fetch(PG_FUNCTION_ARGS)
 			curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			howmany = PG_GETARG_INT32(1);
 			fail = PG_GETARG_BOOL(2);
-			conn = pconn->conn;
+			rconn = pconn;
 		}
 		else
 		{
@@ -574,8 +606,6 @@ dblink_fetch(PG_FUNCTION_ARGS)
 			howmany = PG_GETARG_INT32(2);
 
 			rconn = getConnectionByName(conname);
-			if (rconn)
-				conn = rconn->conn;
 		}
 	}
 	else if (PG_NARGS() == 2)
@@ -583,12 +613,16 @@ dblink_fetch(PG_FUNCTION_ARGS)
 		/* text,int */
 		curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		howmany = PG_GETARG_INT32(1);
-		conn = pconn->conn;
+		rconn = pconn;
 	}
 
-	if (!conn)
+	if (!rconn)
 		dblink_conn_not_avail(conname);
 
+	dblink_bgsession_not_supported(rconn);
+
+	conn = rconn->conn;
+
 	initStringInfo(&buf);
 	appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
 
@@ -633,23 +667,30 @@ PG_FUNCTION_INFO_V1(dblink_send_query);
 Datum
 dblink_send_query(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn *rconn;
 	char	   *sql;
 	int			retval;
 
 	if (PG_NARGS() == 2)
 	{
-		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+		rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 		sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 	}
 	else
 		/* shouldn't happen */
 		elog(ERROR, "wrong number of arguments");
 
-	/* async query send */
-	retval = PQsendQuery(conn, sql);
-	if (retval != 1)
-		elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
+	if (rconn->conn)
+	{
+		retval = PQsendQuery(rconn->conn, sql);
+		if (retval != 1)
+			elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(rconn->conn)));
+	}
+	else
+	{
+		BackgroundSessionSend(rconn->bgconn, sql);
+		retval = 1;
+	}
 
 	PG_RETURN_INT32(retval);
 }
@@ -664,13 +705,11 @@ dblink_get_result(PG_FUNCTION_ARGS)
 static Datum
 dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 {
-	PGconn	   *volatile conn = NULL;
+	remoteConn *volatile rconn = NULL;
 	volatile bool freeconn = false;
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
-
 	PG_TRY();
 	{
 		char	   *sql = NULL;
@@ -682,7 +721,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 3)
 			{
 				/* text,text,bool */
-				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				fail = PG_GETARG_BOOL(2);
 			}
@@ -691,20 +730,20 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 				/* text,text or text,bool */
 				if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 				{
-					conn = pconn->conn;
+					rconn = pconn;
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 					fail = PG_GETARG_BOOL(1);
 				}
 				else
 				{
-					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+					dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 					sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 				}
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				conn = pconn->conn;
+				rconn = pconn;
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 			}
 			else
@@ -717,61 +756,75 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
 			if (PG_NARGS() == 2)
 			{
 				/* text,bool */
-				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+				rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else if (PG_NARGS() == 1)
 			{
 				/* text */
-				conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+				rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 			}
 			else
 				/* shouldn't happen */
 				elog(ERROR, "wrong number of arguments");
 		}
 
-		if (!conn)
+		if (!rconn)
 			dblink_conn_not_avail(conname);
 
 		if (!is_async)
 		{
-			/* synchronous query, use efficient tuple collection method */
-			materializeQueryResult(fcinfo, conn, conname, sql, fail);
+			if (rconn->conn)
+				/* synchronous query, use efficient tuple collection method */
+				materializeQueryResult(fcinfo, rconn->conn, conname, sql, fail);
+			else if (rconn->bgconn)
+			{
+				BackgroundSessionResult *res = BackgroundSessionExecute(rconn->bgconn, sql);
+				materializeBgResult(fcinfo, rconn->bgconn, res);
+			}
 		}
 		else
 		{
 			/* async result retrieval, do it the old way */
-			PGresult   *res = PQgetResult(conn);
-
-			/* NULL means we're all done with the async results */
-			if (res)
+			if (rconn->conn)
 			{
-				if (PQresultStatus(res) != PGRES_COMMAND_OK &&
-					PQresultStatus(res) != PGRES_TUPLES_OK)
-				{
-					dblink_res_error(conn, conname, res,
-									 "could not execute query", fail);
-					/* if fail isn't set, we'll return an empty query result */
-				}
-				else
+				PGresult   *res = PQgetResult(rconn->conn);
+
+				/* NULL means we're all done with the async results */
+				if (res)
 				{
-					materializeResult(fcinfo, conn, res);
+					if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+						PQresultStatus(res) != PGRES_TUPLES_OK)
+					{
+						dblink_res_error(rconn->conn, conname, res,
+										 "could not execute query", fail);
+						/* if fail isn't set, we'll return an empty query result */
+					}
+					else
+					{
+						materializeResult(fcinfo, rconn->conn, res);
+					}
 				}
 			}
+			else
+			{
+				BackgroundSessionResult *res = BackgroundSessionGetResult(rconn->bgconn);
+				materializeBgResult(fcinfo, rconn->bgconn, res);
+			}
 		}
 	}
 	PG_CATCH();
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-			PQfinish(conn);
+			dblink_finish_conn(rconn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
 	/* if needed, close the connection to the database */
 	if (freeconn)
-		PQfinish(conn);
+		dblink_finish_conn(rconn);
 
 	return (Datum) 0;
 }
@@ -806,6 +859,45 @@ prepTuplestoreResult(FunctionCallInfo fcinfo)
 }
 
 /*
+ * get a tuple descriptor for our result type
+ */
+static TupleDesc
+dblink_get_call_result_type(FunctionCallInfo fcinfo, int nfields)
+{
+	TupleDesc	tupdesc;
+
+	switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+	{
+		case TYPEFUNC_COMPOSITE:
+			/* success */
+			break;
+		case TYPEFUNC_RECORD:
+			/* failed to determine actual type of RECORD */
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					 errmsg("function returning record called in context "
+							"that cannot accept type record")));
+			break;
+		default:
+			/* result type isn't composite */
+			elog(ERROR, "return type must be a row type");
+			break;
+	}
+
+	/*
+	 * check result and tuple descriptor have the same number of columns
+	 */
+	if (nfields != tupdesc->natts)
+		ereport(ERROR,
+				(errcode(ERRCODE_DATATYPE_MISMATCH),
+				 errmsg("remote query result rowtype does not match "
+						"the specified FROM clause rowtype")));
+
+	/* make sure we have a persistent copy of the tupdesc */
+	return CreateTupleDescCopy(tupdesc);
+}
+
+/*
  * Copy the contents of the PGresult into a tuplestore to be returned
  * as the result of the current function.
  * The PGresult will be released in this function.
@@ -844,41 +936,11 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
 			Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
 
 			is_sql_cmd = false;
-
-			/* get a tuple descriptor for our result type */
-			switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-			{
-				case TYPEFUNC_COMPOSITE:
-					/* success */
-					break;
-				case TYPEFUNC_RECORD:
-					/* failed to determine actual type of RECORD */
-					ereport(ERROR,
-							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						errmsg("function returning record called in context "
-							   "that cannot accept type record")));
-					break;
-				default:
-					/* result type isn't composite */
-					elog(ERROR, "return type must be a row type");
-					break;
-			}
-
-			/* make sure we have a persistent copy of the tupdesc */
-			tupdesc = CreateTupleDescCopy(tupdesc);
 			ntuples = PQntuples(res);
 			nfields = PQnfields(res);
+			tupdesc = dblink_get_call_result_type(fcinfo, nfields);
 		}
 
-		/*
-		 * check result and tuple descriptor have the same number of columns
-		 */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
-
 		if (ntuples > 0)
 		{
 			AttInMetadata *attinmeta;
@@ -948,6 +1010,95 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
 	PG_END_TRY();
 }
 
+static void
+materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	TupleDesc	tupdesc;
+	bool		is_sql_cmd;
+	int			nfields;
+	AttInMetadata *attinmeta;
+	HeapTuple	outtuple;
+	Tuplestorestate *tupstore;
+	MemoryContext oldcontext;
+	char	  **values;
+
+	/* prepTuplestoreResult must have been called previously */
+	Assert(rsinfo->returnMode == SFRM_Materialize);
+
+	if (!res->tupdesc)
+	{
+		is_sql_cmd = true;
+
+		/*
+		 * need a tuple descriptor representing one TEXT column to return
+		 * the command status string as our result tuple
+		 */
+		tupdesc = CreateTemplateTupleDesc(1, false);
+		TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+						   TEXTOID, -1, 0);
+		nfields = 1;
+	}
+	else
+	{
+		is_sql_cmd = false;
+		nfields = res->tupdesc->natts;
+		tupdesc = dblink_get_call_result_type(fcinfo, nfields);
+	}
+
+	attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+	oldcontext = MemoryContextSwitchTo(
+		rsinfo->econtext->ecxt_per_query_memory);
+	tupstore = tuplestore_begin_heap(true, false, work_mem);
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+	MemoryContextSwitchTo(oldcontext);
+
+	values = (char **) palloc(nfields * sizeof(char *));
+
+	if (is_sql_cmd)
+	{
+		values[0] = (char *) res->command;
+		outtuple = BuildTupleFromCStrings(attinmeta, values);
+		tuplestore_puttuple(tupstore, outtuple);
+	}
+	else
+	{
+		int			j;
+
+		for (j = 0; j < res->ntuples; j++)
+		{
+			HeapTuple intuple = res->tuples[j];
+			int			i;
+
+			for (i = 0; i < nfields; i++)
+			{
+				bool		isnull;
+				Datum		val;
+
+				val = heap_getattr(intuple, i+1, res->tupdesc, &isnull);
+				if (isnull)
+					values[i] = NULL;
+				else
+				{
+					Oid		typOutput;
+					bool	typIsVarlena;
+
+					getTypeOutputInfo(res->tupdesc->attrs[i]->atttypid, &typOutput, &typIsVarlena);
+					values[i] = DatumGetCString(OidFunctionCall1(typOutput, val));
+				}
+			}
+
+			outtuple = BuildTupleFromCStrings(attinmeta, values);
+			tuplestore_puttuple(tupstore, outtuple);
+		}
+	}
+
+	/* clean up and return the tuplestore */
+	tuplestore_donestoring(tupstore);
+}
+
 /*
  * Execute the given SQL command and store its results into a tuplestore
  * to be returned as the result of the current function.
@@ -1165,34 +1316,7 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
 			tuplestore_end(sinfo->tuplestore);
 		sinfo->tuplestore = NULL;
 
-		/* get a tuple descriptor for our result type */
-		switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
-		{
-			case TYPEFUNC_COMPOSITE:
-				/* success */
-				break;
-			case TYPEFUNC_RECORD:
-				/* failed to determine actual type of RECORD */
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						 errmsg("function returning record called in context "
-								"that cannot accept type record")));
-				break;
-			default:
-				/* result type isn't composite */
-				elog(ERROR, "return type must be a row type");
-				break;
-		}
-
-		/* make sure we have a persistent copy of the tupdesc */
-		tupdesc = CreateTupleDescCopy(tupdesc);
-
-		/* check result and tuple descriptor have the same number of columns */
-		if (nfields != tupdesc->natts)
-			ereport(ERROR,
-					(errcode(ERRCODE_DATATYPE_MISMATCH),
-					 errmsg("remote query result rowtype does not match "
-							"the specified FROM clause rowtype")));
+		tupdesc = dblink_get_call_result_type(sinfo->fcinfo, nfields);
 
 		/* Prepare attinmeta for later data conversions */
 		sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
@@ -1292,13 +1416,14 @@ PG_FUNCTION_INFO_V1(dblink_is_busy);
 Datum
 dblink_is_busy(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn	   *rconn;
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 
-	PQconsumeInput(conn);
-	PG_RETURN_INT32(PQisBusy(conn));
+	dblink_bgsession_not_supported(rconn);
+
+	PQconsumeInput(rconn->conn);
+	PG_RETURN_INT32(PQisBusy(rconn->conn));
 }
 
 /*
@@ -1317,13 +1442,13 @@ Datum
 dblink_cancel_query(PG_FUNCTION_ARGS)
 {
 	int			res;
-	PGconn	   *conn;
+	remoteConn *rconn;
 	PGcancel   *cancel;
 	char		errbuf[256];
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
-	cancel = PQgetCancel(conn);
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	dblink_bgsession_not_supported(rconn);
+	cancel = PQgetCancel(rconn->conn);
 
 	res = PQcancel(cancel, errbuf, 256);
 	PQfreeCancel(cancel);
@@ -1350,12 +1475,12 @@ Datum
 dblink_error_message(PG_FUNCTION_ARGS)
 {
 	char	   *msg;
-	PGconn	   *conn;
+	remoteConn *rconn;
 
-	dblink_init();
-	conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+	dblink_bgsession_not_supported(rconn);
 
-	msg = PQerrorMessage(conn);
+	msg = PQerrorMessage(rconn->conn);
 	if (msg == NULL || msg[0] == '\0')
 		PG_RETURN_TEXT_P(cstring_to_text("OK"));
 	else
@@ -1370,14 +1495,11 @@ Datum
 dblink_exec(PG_FUNCTION_ARGS)
 {
 	text	   *volatile sql_cmd_status = NULL;
-	PGconn	   *volatile conn = NULL;
+	remoteConn *volatile rconn = NULL;
 	volatile bool freeconn = false;
 
-	dblink_init();
-
 	PG_TRY();
 	{
-		PGresult   *res = NULL;
 		char	   *sql = NULL;
 		char	   *conname = NULL;
 		bool		fail = true;	/* default to backward compatible behavior */
@@ -1385,7 +1507,7 @@ dblink_exec(PG_FUNCTION_ARGS)
 		if (PG_NARGS() == 3)
 		{
 			/* must be text,text,bool */
-			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+			dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			fail = PG_GETARG_BOOL(2);
 		}
@@ -1394,72 +1516,89 @@ dblink_exec(PG_FUNCTION_ARGS)
 			/* might be text,text or text,bool */
 			if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
 			{
-				conn = pconn->conn;
+				rconn = pconn;
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 				fail = PG_GETARG_BOOL(1);
 			}
 			else
 			{
-				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn);
+				dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn);
 				sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 			}
 		}
 		else if (PG_NARGS() == 1)
 		{
 			/* must be single text argument */
-			conn = pconn->conn;
+			rconn = pconn;
 			sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
 		}
 		else
 			/* shouldn't happen */
 			elog(ERROR, "wrong number of arguments");
 
-		if (!conn)
+		if (!rconn)
 			dblink_conn_not_avail(conname);
 
-		res = PQexec(conn, sql);
-		if (!res ||
-			(PQresultStatus(res) != PGRES_COMMAND_OK &&
-			 PQresultStatus(res) != PGRES_TUPLES_OK))
+		if (rconn->conn)
 		{
-			dblink_res_error(conn, conname, res,
-							 "could not execute command", fail);
+			PGresult   *res;
 
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text("ERROR");
-		}
-		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
-		{
-			/*
-			 * and save a copy of the command status string to return as our
-			 * result tuple
-			 */
-			sql_cmd_status = cstring_to_text(PQcmdStatus(res));
-			PQclear(res);
+			res = PQexec(rconn->conn, sql);
+
+			if (!res ||
+				(PQresultStatus(res) != PGRES_COMMAND_OK &&
+				 PQresultStatus(res) != PGRES_TUPLES_OK))
+			{
+				dblink_res_error(rconn->conn, conname, res,
+								 "could not execute command", fail);
+
+				/*
+				 * and save a copy of the command status string to return as our
+				 * result tuple
+				 */
+				sql_cmd_status = cstring_to_text("ERROR");
+			}
+			else if (PQresultStatus(res) == PGRES_COMMAND_OK)
+			{
+				/*
+				 * and save a copy of the command status string to return as our
+				 * result tuple
+				 */
+				sql_cmd_status = cstring_to_text(PQcmdStatus(res));
+				PQclear(res);
+			}
+			else
+			{
+				PQclear(res);
+				ereport(ERROR,
+						(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+						 errmsg("statement returning results not allowed")));
+			}
 		}
 		else
 		{
-			PQclear(res);
-			ereport(ERROR,
-				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
-				   errmsg("statement returning results not allowed")));
+			BackgroundSessionResult *res;
+
+			res = BackgroundSessionExecute(rconn->bgconn, sql);
+			if (res->tupdesc)
+				ereport(ERROR,
+						(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
+						 errmsg("statement returning results not allowed")));
+			sql_cmd_status = cstring_to_text(res->command);
 		}
 	}
 	PG_CATCH();
 	{
 		/* if needed, close the connection to the database */
 		if (freeconn)
-			PQfinish(conn);
+			dblink_finish_conn(rconn);
 		PG_RE_THROW();
 	}
 	PG_END_TRY();
 
 	/* if needed, close the connection to the database */
 	if (freeconn)
-		PQfinish(conn);
+		dblink_finish_conn(rconn);
 
 	PG_RETURN_TEXT_P(sql_cmd_status);
 }
@@ -1866,7 +2005,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify);
 Datum
 dblink_get_notify(PG_FUNCTION_ARGS)
 {
-	PGconn	   *conn;
+	remoteConn *rconn;
 	PGnotify   *notify;
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
@@ -1876,11 +2015,12 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	prepTuplestoreResult(fcinfo);
 
-	dblink_init();
 	if (PG_NARGS() == 1)
-		conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
+		rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
 	else
-		conn = pconn->conn;
+		rconn = pconn;
+
+	dblink_bgsession_not_supported(rconn);
 
 	/* create the tuplestore in per-query memory */
 	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
@@ -1900,8 +2040,8 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 
 	MemoryContextSwitchTo(oldcontext);
 
-	PQconsumeInput(conn);
-	while ((notify = PQnotifies(conn)) != NULL)
+	PQconsumeInput(rconn->conn);
+	while ((notify = PQnotifies(rconn->conn)) != NULL)
 	{
 		Datum		values[DBLINK_NOTIFY_COLS];
 		bool		nulls[DBLINK_NOTIFY_COLS];
@@ -1924,7 +2064,7 @@ dblink_get_notify(PG_FUNCTION_ARGS)
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 
 		PQfreemem(notify);
-		PQconsumeInput(conn);
+		PQconsumeInput(rconn->conn);
 	}
 
 	/* clean up and return the tuplestore */
@@ -2557,7 +2697,7 @@ createNewConnection(const char *name, remoteConn *rconn)
 
 	if (found)
 	{
-		PQfinish(rconn->conn);
+		dblink_finish_conn(rconn);
 		pfree(rconn);
 
 		ereport(ERROR,
@@ -2592,15 +2732,14 @@ deleteConnection(const char *name)
 }
 
 static void
-dblink_security_check(PGconn *conn, remoteConn *rconn)
+dblink_security_check(remoteConn *rconn)
 {
-	if (!superuser())
+	if (!superuser() && rconn->conn)
 	{
-		if (!PQconnectionUsedPassword(conn))
+		if (!PQconnectionUsedPassword(rconn->conn))
 		{
-			PQfinish(conn);
-			if (rconn)
-				pfree(rconn);
+			PQfinish(rconn->conn);
+			pfree(rconn);
 
 			ereport(ERROR,
 				  (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
diff --git a/contrib/dblink/dblink.control b/contrib/dblink/dblink.control
index bdd17d28a4..816d19f448 100644
--- a/contrib/dblink/dblink.control
+++ b/contrib/dblink/dblink.control
@@ -1,5 +1,5 @@
 # dblink extension
 comment = 'connect to other PostgreSQL databases from within a database'
-default_version = '1.2'
+default_version = '1.3'
 module_pathname = '$libdir/dblink'
 relocatable = true
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 4b6d26e574..6faa525f0c 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -1124,3 +1124,98 @@ SELECT dblink_disconnect('myconn');
 RESET datestyle;
 RESET intervalstyle;
 RESET timezone;
+-- background sessions
+SELECT * FROM foo;
+ f1 | f2 |      f3       
+----+----+---------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+  3 | d  | {a3,b3,c3}
+  4 | e  | {a4,b4,c4}
+  5 | f  | {a5,b5,c5}
+  6 | g  | {a6,b6,c6}
+  7 | h  | {a7,b7,c7}
+  8 | i  | {a8,b8,c8}
+  9 | j  | {a9,b9,c9}
+ 10 | k  | {a10,b10,c10}
+(11 rows)
+
+SELECT dblink_connect_self();
+ dblink_connect_self 
+---------------------
+ OK
+(1 row)
+
+SELECT dblink_exec('SELECT * FROM foo');
+ERROR:  statement returning results not allowed
+SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5');
+ dblink_exec 
+-------------
+ DELETE 5
+(1 row)
+
+SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]);
+ a | b |     c      
+---+---+------------
+ 0 | a | {a0,b0,c0}
+ 1 | b | {a1,b1,c1}
+ 2 | c | {a2,b2,c2}
+ 3 | d | {a3,b3,c3}
+ 4 | e | {a4,b4,c4}
+ 5 | f | {a5,b5,c5}
+(6 rows)
+
+SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text);
+  status  
+----------
+ DELETE 0
+(1 row)
+
+SELECT dblink_disconnect();
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
+SELECT * FROM foo;
+ f1 | f2 |     f3     
+----+----+------------
+  0 | a  | {a0,b0,c0}
+  1 | b  | {a1,b1,c1}
+  2 | c  | {a2,b2,c2}
+  3 | d  | {a3,b3,c3}
+  4 | e  | {a4,b4,c4}
+  5 | f  | {a5,b5,c5}
+(6 rows)
+
+-- asynchronous case
+SELECT dblink_connect_self('myconn');
+ dblink_connect_self 
+---------------------
+ OK
+(1 row)
+
+SELECT *
+FROM dblink_send_query('myconn',
+    'SELECT * FROM
+     (VALUES (''12.03.2013 00:00:00+00''),
+             (''12.03.2013 00:00:00+00'')) t');
+ dblink_send_query 
+-------------------
+                 1
+(1 row)
+
+SELECT * from dblink_get_result('myconn') as t(t timestamptz);
+              t               
+------------------------------
+ Mon Dec 02 16:00:00 2013 PST
+ Mon Dec 02 16:00:00 2013 PST
+(2 rows)
+
+SELECT dblink_disconnect('myconn');
+ dblink_disconnect 
+-------------------
+ OK
+(1 row)
+
diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql
index 681cf6a6e8..7f66a98aa5 100644
--- a/contrib/dblink/sql/dblink.sql
+++ b/contrib/dblink/sql/dblink.sql
@@ -563,3 +563,23 @@ CREATE TEMPORARY TABLE result (t timestamptz);
 RESET datestyle;
 RESET intervalstyle;
 RESET timezone;
+
+-- background sessions
+SELECT * FROM foo;
+SELECT dblink_connect_self();
+SELECT dblink_exec('SELECT * FROM foo');
+SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5');
+SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]);
+SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text);
+SELECT dblink_disconnect();
+SELECT * FROM foo;
+
+-- asynchronous case
+SELECT dblink_connect_self('myconn');
+SELECT *
+FROM dblink_send_query('myconn',
+    'SELECT * FROM
+     (VALUES (''12.03.2013 00:00:00+00''),
+             (''12.03.2013 00:00:00+00'')) t');
+SELECT * from dblink_get_result('myconn') as t(t timestamptz);
+SELECT dblink_disconnect('myconn');
diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml
index f19c6b19f5..4b070ddb23 100644
--- a/doc/src/sgml/dblink.sgml
+++ b/doc/src/sgml/dblink.sgml
@@ -239,6 +239,43 @@ <title>Description</title>
   </refsect1>
  </refentry>
 
+ <refentry id="CONTRIB-DBLINK-CONNECT-SELF">
+  <indexterm>
+   <primary>dblink_connect_self</primary>
+  </indexterm>
+
+  <refmeta>
+   <refentrytitle>dblink_connect_self</refentrytitle>
+   <manvolnum>3</manvolnum>
+  </refmeta>
+
+  <refnamediv>
+   <refname>dblink_connect_self</refname>
+   <refpurpose>opens a persistent connection to the same database</refpurpose>
+  </refnamediv>
+
+  <refsynopsisdiv>
+<synopsis>
+dblink_connect_self() returns text
+dblink_connect_self(text connname) returns text
+</synopsis>
+  </refsynopsisdiv>
+
+  <refsect1>
+   <title>Description</title>
+
+   <para>
+    <function>dblink_connect_self()</> opens a connection to the current
+    database and as the current user, using the background session API
+    (<xref linkend="bgsession">).
+   </para>
+
+   <para>
+    For further details see <function>dblink_connect()</>.
+   </para>
+  </refsect1>
+ </refentry>
+
  <refentry id="CONTRIB-DBLINK-DISCONNECT">
   <indexterm>
    <primary>dblink_disconnect</primary>
-- 
2.12.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to