Here is a patch for the background sessions C API and PL/Python support.
 This was previously submitted as "autonomous transactions", which
proved controversial, and there have been several suggestions for a new
name.

I have renamed everything, removed all the incomplete PL/pgSQL stuff,
did some refinement on the PL/Python interfaces, added resource owner
management so that you can preserve session handles across transactions.
 That allows a pg_background-like behavior implemented in a PL function.
 I have also added documentation, so reviewers could start there.
Probably not quite all done yet, but I think it contains a lot of useful
pieces that we could make into something nice.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 661c7fe769982e3f0c71f4ad57a768d7eb55f6e2 Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Mon, 31 Oct 2016 12:00:00 -0400
Subject: [PATCH] Add background sessions

This adds a C API to run SQL statements in a background worker,
communicating by FE/BE protocol over a DSM message queue.  This can be
used to execute statements and transactions separate from the main
foreground session.

Also included is a PL/Python interface to this functionality.
---
 doc/src/sgml/bgsession.sgml                     | 236 +++++++
 doc/src/sgml/filelist.sgml                      |   1 +
 doc/src/sgml/plpython.sgml                      | 102 +++
 doc/src/sgml/postgres.sgml                      |   1 +
 src/backend/commands/variable.c                 |   5 +
 src/backend/libpq/pqmq.c                        |  23 +
 src/backend/tcop/Makefile                       |   2 +-
 src/backend/tcop/bgsession.c                    | 890 ++++++++++++++++++++++++
 src/backend/tcop/postgres.c                     |  24 +-
 src/include/commands/variable.h                 |   1 +
 src/include/libpq/pqmq.h                        |   1 +
 src/include/tcop/bgsession.h                    |  26 +
 src/include/tcop/tcopprot.h                     |   9 +
 src/pl/plpython/Makefile                        |   2 +
 src/pl/plpython/expected/plpython_bgsession.out | 188 +++++
 src/pl/plpython/expected/plpython_test.out      |   7 +-
 src/pl/plpython/plpy_bgsession.c                | 454 ++++++++++++
 src/pl/plpython/plpy_bgsession.h                |  18 +
 src/pl/plpython/plpy_main.h                     |   3 +
 src/pl/plpython/plpy_planobject.c               |   1 +
 src/pl/plpython/plpy_planobject.h               |   2 +
 src/pl/plpython/plpy_plpymodule.c               |   5 +
 src/pl/plpython/plpy_spi.c                      |   7 +-
 src/pl/plpython/plpy_spi.h                      |   3 +
 src/pl/plpython/sql/plpython_bgsession.sql      | 148 ++++
 25 files changed, 2139 insertions(+), 20 deletions(-)
 create mode 100644 doc/src/sgml/bgsession.sgml
 create mode 100644 src/backend/tcop/bgsession.c
 create mode 100644 src/include/tcop/bgsession.h
 create mode 100644 src/pl/plpython/expected/plpython_bgsession.out
 create mode 100644 src/pl/plpython/plpy_bgsession.c
 create mode 100644 src/pl/plpython/plpy_bgsession.h
 create mode 100644 src/pl/plpython/sql/plpython_bgsession.sql

diff --git a/doc/src/sgml/bgsession.sgml b/doc/src/sgml/bgsession.sgml
new file mode 100644
index 0000000..785ee66
--- /dev/null
+++ b/doc/src/sgml/bgsession.sgml
@@ -0,0 +1,236 @@
+<!-- doc/src/sgml/bgsession.sgml -->
+
+<chapter id="bgsession">
+ <title>Background Session API</title>
+
+ <para>
+  The background session API is a C API for creating additional database
+  sessions in the background and running SQL statements in them.  A background
+  session behaves like a normal (foreground) session in that it has session
+  state, transactions, can run SQL statements, and so on.  Unlike a foreground
+  session, it is not connected directly to a client.  Instead the foreground
+  session can use this API to execute SQL statements and retrieve their
+  results.  Higher-level integrations, such as in procedural languages, can
+  make this functionality available to clients.  Background sessions are
+  independent from their foreground sessions in their session and transaction
+  state.  So a background session cannot see uncommitted data in foreground
+  sessions or vice versa, and there is no preferential treatment about
+  locking.  Like all sessions, background sessions are separate processes.
+  Foreground and background sessions communicate over shared memory messages
+  queues instead of the sockets that a client/server connection uses.
+ </para>
+
+ <para>
+  Background sessions can be useful in a variety of scenarios when effects
+  that are independent of the foreground session are to be achieved, for
+  example:
+  <itemizedlist>
+   <listitem>
+    <para>
+     Commit data independent of whether a foreground transaction commits, for
+     example for auditing.  A trigger in the foreground session could effect
+     the necessary writes via a background session.
+    </para>
+   </listitem>
+   <listitem>
+    <para>
+     Large changes can be split up into smaller transactions.  A foreground
+     session can orchestrate the logic, for example in a function, while the
+     actual writes and commits are executed in a background session.
+    </para>
+   </listitem>
+  </itemizedlist>
+ </para>
+
+ <sect1 id="bgsession-types">
+  <title>API</title>
+
+  <para>
+   Use <literal>#include "tcop/bgsession.h"</literal> to include the API
+   declarations.
+  </para>
+
+  <sect2>
+   <title>Types</title>
+
+   <variablelist>
+    <varlistentry>
+     <term><type>BackgroundSession</type></term>
+     <listitem>
+      <para>
+       An opaque connection handle.  Multiple background sessions can exist at
+       the same time, and each is prepresented by an instance of this type.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionPreparedStatement</type></term>
+     <listitem>
+      <para>
+       An opaque handle for a prepared statement, created when the statement
+       is prepared and used when the statement is executed.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><type>BackgroundSessionResult</type></term>
+     <listitem>
+      <para>
+       A handle for a query result, defined as:
+<programlisting>
+typedef struct BackgroundSessionResult
+{
+    TupleDesc   tupdesc;
+    List       *tuples;
+    const char *command;
+} BackgroundSessionResult;
+</programlisting>
+       <structfield>tupdesc</structfield> describes the result
+       rows, <symbol>NULL</symbol> if the command does not return
+       rows.  <structfield>tuples</structfield> is a list
+       of <type>HeapTuple</type>s with the result
+       rows.  <structfield>command</structfield> is the tag of the executed
+       command.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </sect2>
+
+  <sect2>
+   <title>Functions</title>
+
+   <variablelist>
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSession *<function>BackgroundSessionStart</function></funcdef>
+        <paramdef>void</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Creates a background session.  This starts the background worker and
+       establishes a connection to it.
+      </para>
+
+      <para>
+       A background session does not automatically end when the foreground
+       transaction or session ends.
+       Use <function>BackgroundSessionEnd()</function> to end a background
+       session.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>void <function>BackgroundSessionEnd</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Ends a background session.  This closes the connection the background
+       worker.
+      </para>
+
+      <para>
+       It is an error to close a background session with a transaction block
+       open.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecute</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute an SQL statement and return the result.  Access the fields of
+       the result structure to get details about the command result.  If there
+       is an error, this function does not return.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionPreparedStatement *<function>BackgroundSessionPrepare</function></funcdef>
+        <paramdef>BackgroundSession *<parameter>session</parameter></paramdef>
+        <paramdef>const char *<parameter>sql</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Oid <parameter>argtypes</parameter>[]</paramdef>
+        <paramdef>const char *<parameter>argnames</parameter></paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Prepare an SQL statement for later execution
+       by <function>BackgroundSessionExecutePrepared()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term>
+      <funcsynopsis>
+       <funcprototype>
+        <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecutePrepared</function></funcdef>
+        <paramdef>BackgroundSessionPreparedStatement *<parameter>stmt</parameter></paramdef>
+        <paramdef>int <parameter>nargs</parameter></paramdef>
+        <paramdef>Datum <parameter>values</parameter>[]</paramdef>
+        <paramdef>bool <parameter>nulls</parameter>[]</paramdef>
+       </funcprototype>
+      </funcsynopsis>
+     </term>
+     <listitem>
+      <para>
+       Execute a statement previously prepared by
+       <function>BackgroundSessionPrepare()</function>.
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+
+   <para>
+    Here is a very simple example:
+
+<programlisting>
+#include "tcop/bgsession.h"
+
+void
+myfunc()
+{
+    BackgroundSession *session;
+    BackgroundSessionResult *result;
+
+    session = BackgroundSessionStart();
+
+    result = BackgroundSessionExecute(session, "SELECT ...");
+    elog(INFO, "returned %d rows", list_length(result->tuples));
+
+    BackgroundSessionEnd(session);
+}
+</programlisting>
+   </para>
+  </sect2>
+ </sect1>
+</chapter>
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 69649a7..7531c8f 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -52,6 +52,7 @@
 <!ENTITY wal           SYSTEM "wal.sgml">
 
 <!-- programmer's guide -->
+<!ENTITY bgsession  SYSTEM "bgsession.sgml">
 <!ENTITY bgworker   SYSTEM "bgworker.sgml">
 <!ENTITY dfunc      SYSTEM "dfunc.sgml">
 <!ENTITY ecpg       SYSTEM "ecpg.sgml">
diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml
index 4639778..53a2e8f 100644
--- a/doc/src/sgml/plpython.sgml
+++ b/doc/src/sgml/plpython.sgml
@@ -1357,6 +1357,108 @@ <title>Older Python Versions</title>
   </sect2>
  </sect1>
 
+ <sect1 id="plpython-bgsession">
+  <title>Background Sessions</title>
+
+  <para>
+   PL/Python exposes the background session interface described
+   in <xref linkend="bgsession"> as a Python API.  A background session is
+   represented by a Python object of type <type>plpy.BackgroundSession</type>.
+   It has these methods:
+
+   <variablelist>
+    <varlistentry>
+     <term><literal>.close()</literal></term>
+     <listitem>
+      <para>
+       Close the session.  This must be called or the session will leak.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute(<parameter>sql</parameter>)</literal></term>
+     <listitem>
+      <para>
+       Execute an SQL statement.  The return value is a result object of the
+       same type as returned by <literal>plpy.execute</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.prepare(<parameter>sql</parameter>, <replaceable>types</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Prepare an SQL statement.  The parameter data types are specified as
+       for <literal>plpy.prepare</literal>.  The return value is a plan
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.execute_prepared(<parameter>plan</parameter>, <replaceable>values</replaceable>)</literal></term>
+     <listitem>
+      <para>
+       Execute a prepared statement.  The parameter values are specified as
+       for <literal>plpy.execute</literal>.  The return value is a result
+       object.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__enter__()</literal></term>
+     <listitem>
+      <para>
+       This just returns self.  It is supplied so that
+       a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  See example below.
+      </para>
+     </listitem>
+    </varlistentry>
+
+    <varlistentry>
+     <term><literal>.__exit__(<parameter>exc_type</parameter>, <parameter>exc</parameter>, <parameter>tb</parameter>)</literal></term>
+     <listitem>
+      <para>
+       This is the same as <literal>.close()</literal>.  It is supplied so
+       that a <classname>BackgroundSession</classname> object can be used as a
+       context manager.  (The arguments are ignored.)
+      </para>
+     </listitem>
+    </varlistentry>
+   </variablelist>
+  </para>
+
+  <para>
+   A <classname>BackgroundSession</classname> object can be preserved across
+   function calls or passed between functions via the <varname>SD</varname>
+   and <varname>GD</varname> variables.
+  </para>
+
+  <para>
+   Example:
+<programlisting>
+CREATE FUNCTION bulkload() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(1, 1001):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 100 == 0:
+            a.execute("COMMIT")
+
+return 0
+$$;
+</programlisting>
+   This function inserts 1000 values into the table and commits after every
+   100 values.
+  </para>
+ </sect1>
+
  <sect1 id="plpython-util">
   <title>Utility Functions</title>
   <para>
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index 9143917..e281002 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -220,6 +220,7 @@ <title>Server Programming</title>
 
   &spi;
   &bgworker;
+  &bgsession;
   &logicaldecoding;
   &replication-origins;
 
diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c
index defafa5..a522c69 100644
--- a/src/backend/commands/variable.c
+++ b/src/backend/commands/variable.c
@@ -674,12 +674,17 @@ show_random_seed(void)
  * SET CLIENT_ENCODING
  */
 
+void (*check_client_encoding_hook)(void);
+
 bool
 check_client_encoding(char **newval, void **extra, GucSource source)
 {
 	int			encoding;
 	const char *canonical_name;
 
+	if (check_client_encoding_hook)
+		check_client_encoding_hook();
+
 	/* Look up the encoding by name */
 	encoding = pg_valid_client_encoding(*newval);
 	if (encoding < 0)
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index f93ccae..160f05b 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -48,6 +48,11 @@ static PQcommMethods PqCommMqMethods = {
 	mq_endcopyout
 };
 
+static PQcommMethods *save_PqCommMethods;
+static CommandDest save_whereToSendOutput;
+static ProtocolVersion save_FrontendProtocol;
+static dsm_segment *save_seg;
+
 /*
  * Arrange to redirect frontend/backend protocol messages to a shared-memory
  * message queue.
@@ -55,12 +60,30 @@ static PQcommMethods PqCommMqMethods = {
 void
 pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
 {
+	save_PqCommMethods = PqCommMethods;
+	save_whereToSendOutput = whereToSendOutput;
+	save_FrontendProtocol = FrontendProtocol;
+
 	PqCommMethods = &PqCommMqMethods;
 	pq_mq = shm_mq_get_queue(mqh);
 	pq_mq_handle = mqh;
 	whereToSendOutput = DestRemote;
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+
+	save_seg = seg;
+}
+
+void
+pq_stop_redirect_to_shm_mq(void)
+{
+	cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
+	PqCommMethods = save_PqCommMethods;
+	whereToSendOutput = save_whereToSendOutput;
+	FrontendProtocol = save_FrontendProtocol;
+	pq_mq = NULL;
+	pq_mq_handle = NULL;
+	save_seg = NULL;
 }
 
 /*
diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile
index 674302f..ab26767 100644
--- a/src/backend/tcop/Makefile
+++ b/src/backend/tcop/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/tcop
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS= dest.o fastpath.o postgres.o pquery.o utility.o
+OBJS= bgsession.o dest.o fastpath.o postgres.o pquery.o utility.o
 
 ifneq (,$(filter $(PORTNAME),cygwin win32))
 override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT)
diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c
new file mode 100644
index 0000000..2cc6438
--- /dev/null
+++ b/src/backend/tcop/bgsession.c
@@ -0,0 +1,890 @@
+/*--------------------------------------------------------------------------
+ *
+ * bgsession.c
+ *		Run SQL commands using a background worker.
+ *
+ * Copyright (C) 2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/backend/tcop/bgsession.c
+ *
+ *
+ * This implements a C API to open a background session and run SQL queries
+ * in it.  The session looks much like a normal database connection, but it is
+ * always to the same database, and there is no authentication needed.  The
+ * "backend" for that connection is a background worker.  The normal backend
+ * and the background session worker communicate over the normal FE/BE
+ * protocol.
+ *
+ * Types:
+ *
+ * BackgroundSession -- opaque connection handle
+ * BackgroundSessionPreparedStatement -- opaque prepared statement handle
+ * BackgroundSessionResult -- query result
+ *
+ * Functions:
+ *
+ * BackgroundSessionStart() -- start a session (launches background worker)
+ * and return a handle
+ *
+ * BackgroundSessionEnd() -- close session and free resources
+ *
+ * BackgroundSessionExecute() -- run SQL string and return result (rows or
+ * status)
+ *
+ * BackgroundSessionPrepare() -- prepare an SQL string for subsequent
+ * execution
+ *
+ * BackgroundSessionExecutePrepared() -- run prepared statement
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "access/tupdesc.h"
+#include "access/xact.h"
+#include "commands/async.h"
+#include "commands/variable.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/pg_list.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/bgsession.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+/* Table-of-contents constants for our dynamic shared memory segment. */
+#define BGSESSION_MAGIC					0x50674267
+
+#define BGSESSION_KEY_FIXED_DATA		0
+#define BGSESSION_KEY_GUC				1
+#define BGSESSION_KEY_COMMAND_QUEUE		2
+#define BGSESSION_KEY_RESPONSE_QUEUE	3
+#define BGSESSION_NKEYS					4
+
+#define BGSESSION_QUEUE_SIZE			16384
+
+/* Fixed-size data passed via our dynamic shared memory segment. */
+typedef struct background_session_fixed_data
+{
+	Oid database_id;
+	Oid authenticated_user_id;
+	Oid current_user_id;
+	int sec_context;
+} background_session_fixed_data;
+
+struct BackgroundSession
+{
+	ResourceOwner resowner;
+	dsm_segment *seg;
+	BackgroundWorkerHandle *worker_handle;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	int		transaction_status;
+};
+
+struct BackgroundSessionPreparedStatement
+{
+	BackgroundSession *session;
+	Oid		   *argtypes;
+	TupleDesc	tupdesc;
+};
+
+static void bgsession_worker_main(Datum main_arg);
+static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg);
+static void bgsession_check_client_encoding_hook(void);
+static TupleDesc TupleDesc_from_RowDescription(StringInfo msg);
+static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg);
+static void forward_NotifyResponse(StringInfo msg);
+static void rethrow_errornotice(StringInfo msg);
+static void invalid_protocol_message(char msgtype) pg_attribute_noreturn();
+
+
+BackgroundSession *
+BackgroundSessionStart(void)
+{
+	ResourceOwner oldowner;
+	BackgroundWorker worker;
+	pid_t		pid;
+	BackgroundSession *session;
+	shm_toc_estimator e;
+	Size		segsize;
+	Size		guc_len;
+	char	   *gucstate;
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	BgwHandleStatus bgwstatus;
+	StringInfoData msg;
+	char		msgtype;
+
+	session = palloc(sizeof(*session));
+
+	session->resowner = ResourceOwnerCreate(NULL, "background session");
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(background_session_fixed_data));
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE);
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&e, guc_len);
+	shm_toc_estimate_keys(&e, BGSESSION_NKEYS);
+	segsize = shm_toc_estimate(&e);
+	oldowner = CurrentResourceOwner;
+	PG_TRY();
+	{
+		CurrentResourceOwner = session->resowner;
+		seg = dsm_create(segsize, 0);
+	}
+	PG_CATCH();
+	{
+		CurrentResourceOwner = oldowner;
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+	CurrentResourceOwner = oldowner;
+
+	session->seg = seg;
+
+	toc = shm_toc_create(BGSESSION_MAGIC, dsm_segment_address(seg), segsize);
+
+	/* Store fixed-size data in dynamic shared memory. */
+	fdata = shm_toc_allocate(toc, sizeof(*fdata));
+	fdata->database_id = MyDatabaseId;
+	fdata->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context);
+	shm_toc_insert(toc, BGSESSION_KEY_FIXED_DATA, fdata);
+
+	/* Store GUC state in dynamic shared memory. */
+	gucstate = shm_toc_allocate(toc, guc_len);
+	SerializeGUCState(guc_len, gucstate);
+	shm_toc_insert(toc, BGSESSION_KEY_GUC, gucstate);
+
+	command_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+							   BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_COMMAND_QUEUE, command_mq);
+	shm_mq_set_sender(command_mq, MyProc);
+
+	response_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE),
+								BGSESSION_QUEUE_SIZE);
+	shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq);
+	shm_mq_set_receiver(response_mq, MyProc);
+
+	session->command_qh = shm_mq_attach(command_mq, seg, NULL);
+	session->response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = bgsession_worker_main;
+	snprintf(worker.bgw_name, BGW_MAXLEN, "background session by PID %d", MyProcPid);
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background process"),
+				 errhint("You might need to increase max_worker_processes.")));
+
+	shm_mq_set_handle(session->command_qh, session->worker_handle);
+	shm_mq_set_handle(session->response_qh, session->worker_handle);
+
+	bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
+	if (bgwstatus != BGWH_STARTED)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not start background worker")));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return session;
+}
+
+
+void
+BackgroundSessionEnd(BackgroundSession *session)
+{
+	StringInfoData msg;
+
+	if (session->transaction_status == 'T')
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("background session ended with transaction block open")));
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'X');
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	pfree(session->worker_handle);
+	dsm_detach(session->seg);
+	ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
+	ResourceOwnerDelete(session->resowner);
+	pfree(session);
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecute(BackgroundSession *session, const char *sql)
+{
+	StringInfoData msg;
+	char		msgtype;
+	BackgroundSessionResult *result;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'Q');
+	pq_sendstring(&msg, sql);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!result->tupdesc)
+					elog(ERROR, "no T before D");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(result->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			case 'Z':
+				session->transaction_status = pq_getmsgbyte(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'Z');
+
+	return result;
+}
+
+
+BackgroundSessionPreparedStatement *
+BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs,
+						 Oid argtypes[], const char *argnames[])
+{
+	BackgroundSessionPreparedStatement *result;
+	StringInfoData msg;
+	int			i;
+	char		msgtype;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'P');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, sql);
+	pq_sendint(&msg, nargs, 2);
+	for (i = 0; i < nargs; i++)
+		pq_sendint(&msg, argtypes[i], 4);
+	if (argnames)
+		for (i = 0; i < nargs; i++)
+			pq_sendstring(&msg, argnames[i]);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->session = session;
+	result->argtypes = palloc(nargs * sizeof(*result->argtypes));
+	memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes));
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '1':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'D');
+	pq_sendbyte(&msg, 'S');
+	pq_sendstring(&msg, "");
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'E':
+				rethrow_errornotice(&msg);
+				break;
+			case 'n':
+				break;
+			case 't':
+				/* ignore for now */
+				break;
+			case 'T':
+				if (result->tupdesc)
+					elog(ERROR, "already received a T message");
+				result->tupdesc = TupleDesc_from_RowDescription(&msg);
+				pq_getmsgend(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'n' && msgtype != 'T');
+
+	return result;
+}
+
+
+BackgroundSessionResult *
+BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[])
+{
+	BackgroundSession *session;
+	StringInfoData msg;
+	BackgroundSessionResult *result;
+	char		msgtype;
+	int			i;
+
+	session = stmt->session;
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'B');
+	pq_sendstring(&msg, "");
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 1, 2);  /* number of parameter format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_sendint(&msg, nargs, 2);  /* number of parameter values */
+	for (i = 0; i < nargs; i++)
+	{
+		if (nulls[i])
+			pq_sendint(&msg, -1, 4);
+		else
+		{
+			Oid			typsend;
+			bool		typisvarlena;
+			bytea	   *outputbytes;
+
+			getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena);
+			outputbytes = OidSendFunctionCall(typsend, values[i]);
+			pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4);
+			pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ);
+			pfree(outputbytes);
+		}
+	}
+	pq_sendint(&msg, 1, 2);  /* number of result column format codes */
+	pq_sendint(&msg, 1, 2);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case '2':
+			break;
+		case 'E':
+			rethrow_errornotice(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_beginmessage(&msg, 'E');
+	pq_sendstring(&msg, "");
+	pq_sendint(&msg, 0, 4);
+	pq_endmessage(&msg);
+	pq_stop_redirect_to_shm_mq();
+
+	result = palloc0(sizeof(*result));
+	result->tupdesc = stmt->tupdesc;
+
+	do
+	{
+		shm_mq_receive_stringinfo(session->response_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'A':
+				forward_NotifyResponse(&msg);
+				break;
+			case 'C':
+				{
+					const char *tag = pq_getmsgstring(&msg);
+					result->command = pstrdup(tag);
+					pq_getmsgend(&msg);
+					break;
+				}
+			case 'D':
+				if (!stmt->tupdesc)
+					elog(ERROR, "did not expect any rows");
+				result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(stmt->tupdesc, &msg));
+				pq_getmsgend(&msg);
+				break;
+			case 'E':
+			case 'N':
+				rethrow_errornotice(&msg);
+				break;
+			default:
+				invalid_protocol_message(msgtype);
+				break;
+		}
+	}
+	while (msgtype != 'C');
+
+	pq_redirect_to_shm_mq(session->seg, session->command_qh);
+	pq_putemptymessage('S');
+	pq_stop_redirect_to_shm_mq();
+
+	shm_mq_receive_stringinfo(session->response_qh, &msg);
+	msgtype = pq_getmsgbyte(&msg);
+
+	switch (msgtype)
+	{
+		case 'A':
+			forward_NotifyResponse(&msg);
+			break;
+		case 'Z':
+			session->transaction_status = pq_getmsgbyte(&msg);
+			pq_getmsgend(&msg);
+			break;
+		default:
+			invalid_protocol_message(msgtype);
+			break;
+	}
+
+	return result;
+}
+
+
+static void
+bgsession_worker_main(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc	   *toc;
+	background_session_fixed_data *fdata;
+	char	   *gucstate;
+	shm_mq	   *command_mq;
+	shm_mq	   *response_mq;
+	shm_mq_handle *command_qh;
+	shm_mq_handle *response_qh;
+	StringInfoData msg;
+	char		msgtype;
+
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "background session worker");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "background session",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(BGSESSION_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Find data structures in dynamic shared memory. */
+	fdata = shm_toc_lookup(toc, BGSESSION_KEY_FIXED_DATA);
+
+	gucstate = shm_toc_lookup(toc, BGSESSION_KEY_GUC);
+
+	command_mq = shm_toc_lookup(toc, BGSESSION_KEY_COMMAND_QUEUE);
+	shm_mq_set_receiver(command_mq, MyProc);
+	command_qh = shm_mq_attach(command_mq, seg, NULL);
+
+	response_mq = shm_toc_lookup(toc, BGSESSION_KEY_RESPONSE_QUEUE);
+	shm_mq_set_sender(response_mq, MyProc);
+	response_qh = shm_mq_attach(response_mq, seg, NULL);
+
+	pq_redirect_to_shm_mq(seg, response_qh);
+	BackgroundWorkerInitializeConnectionByOid(fdata->database_id,
+											  fdata->authenticated_user_id);
+
+	SetClientEncoding(GetDatabaseEncoding());
+
+	StartTransactionCommand();
+	RestoreGUCState(gucstate);
+	CommitTransactionCommand();
+
+	process_session_preload_libraries();
+
+	SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context);
+
+	whereToSendOutput = DestRemote;
+	ReadyForQuery(whereToSendOutput);
+
+	MessageContext = AllocSetContextCreate(TopMemoryContext,
+										   "MessageContext",
+										   ALLOCSET_DEFAULT_MINSIZE,
+										   ALLOCSET_DEFAULT_INITSIZE,
+										   ALLOCSET_DEFAULT_MAXSIZE);
+
+	do
+	{
+		MemoryContextSwitchTo(MessageContext);
+		MemoryContextResetAndDeleteChildren(MessageContext);
+
+		ProcessCompletedNotifies();
+		pgstat_report_stat(false);
+		pgstat_report_activity(STATE_IDLE, NULL);
+
+		shm_mq_receive_stringinfo(command_qh, &msg);
+		msgtype = pq_getmsgbyte(&msg);
+
+		switch (msgtype)
+		{
+			case 'B':
+				{
+					SetCurrentStatementStartTimestamp();
+					exec_bind_message(&msg);
+					break;
+				}
+			case 'D':
+				{
+					int         describe_type;
+					const char *describe_target;
+
+					SetCurrentStatementStartTimestamp();
+
+					describe_type = pq_getmsgbyte(&msg);
+					describe_target = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					switch (describe_type)
+					{
+						case 'S':
+							exec_describe_statement_message(describe_target);
+							break;
+#ifdef TODO
+						case 'P':
+							exec_describe_portal_message(describe_target);
+							break;
+#endif
+						default:
+							ereport(ERROR,
+									(errcode(ERRCODE_PROTOCOL_VIOLATION),
+									 errmsg("invalid DESCRIBE message subtype %d",
+											describe_type)));
+							break;
+					}
+				}
+				break;
+			case 'E':
+				{
+					const char *portal_name;
+					int			max_rows;
+
+					SetCurrentStatementStartTimestamp();
+
+					portal_name = pq_getmsgstring(&msg);
+					max_rows = pq_getmsgint(&msg, 4);
+					pq_getmsgend(&msg);
+
+					exec_execute_message(portal_name, max_rows);
+				}
+				break;
+
+			case 'P':
+				{
+					const char *stmt_name;
+					const char *query_string;
+					int			numParams;
+					Oid		   *paramTypes = NULL;
+
+					SetCurrentStatementStartTimestamp();
+
+					stmt_name = pq_getmsgstring(&msg);
+					query_string = pq_getmsgstring(&msg);
+					numParams = pq_getmsgint(&msg, 2);
+					if (numParams > 0)
+					{
+						int			i;
+
+						paramTypes = palloc(numParams * sizeof(Oid));
+						for (i = 0; i < numParams; i++)
+							paramTypes[i] = pq_getmsgint(&msg, 4);
+					}
+					pq_getmsgend(&msg);
+
+					exec_parse_message(query_string, stmt_name,
+									   paramTypes, numParams);
+					break;
+				}
+			case 'Q':
+				{
+					const char *sql;
+					int save_log_statement;
+					bool save_log_duration;
+					int save_log_min_duration_statement;
+
+					sql = pq_getmsgstring(&msg);
+					pq_getmsgend(&msg);
+
+					/* XXX room for improvement */
+					save_log_statement = log_statement;
+					save_log_duration = log_duration;
+					save_log_min_duration_statement = log_min_duration_statement;
+
+					check_client_encoding_hook = bgsession_check_client_encoding_hook;
+					log_statement = LOGSTMT_NONE;
+					log_duration = false;
+					log_min_duration_statement = -1;
+
+					SetCurrentStatementStartTimestamp();
+					exec_simple_query(sql, 1);
+
+					log_statement = save_log_statement;
+					log_duration = save_log_duration;
+					log_min_duration_statement = save_log_min_duration_statement;
+					check_client_encoding_hook = NULL;
+
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'S':
+				{
+					pq_getmsgend(&msg);
+					finish_xact_command();
+					ReadyForQuery(whereToSendOutput);
+					break;
+				}
+			case 'X':
+				break;
+			default:
+				ereport(ERROR,
+						(errcode(ERRCODE_PROTOCOL_VIOLATION),
+						 errmsg("invalid protocol message type from background session leader: %c",
+								msgtype)));
+				break;
+		}
+	}
+	while (msgtype != 'X');
+}
+
+
+static void
+shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg)
+{
+	shm_mq_result res;
+	Size		nbytes;
+	void	   *data;
+
+	res = shm_mq_receive(qh, &nbytes, &data, false);
+	if (res != SHM_MQ_SUCCESS)
+		elog(ERROR, "shm_mq_receive failed: %d", res);
+
+	initStringInfo(msg);
+	appendBinaryStringInfo(msg, data, nbytes);
+}
+
+
+static void
+bgsession_check_client_encoding_hook(void)
+{
+	elog(ERROR, "cannot set client encoding in background session");
+}
+
+
+static TupleDesc
+TupleDesc_from_RowDescription(StringInfo msg)
+{
+	TupleDesc	tupdesc;
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+
+	tupdesc = CreateTemplateTupleDesc(natts, false);
+	for (i = 0; i < natts; i++)
+	{
+		const char *colname;
+		Oid     type_oid;
+		int32	typmod;
+		int16	format;
+
+		colname = pq_getmsgstring(msg);
+		(void) pq_getmsgint(msg, 4);   /* table OID */
+		(void) pq_getmsgint(msg, 2);   /* table attnum */
+		type_oid = pq_getmsgint(msg, 4);
+		(void) pq_getmsgint(msg, 2);   /* type length */
+		typmod = pq_getmsgint(msg, 4);
+		format = pq_getmsgint(msg, 2);
+		(void) format;
+#ifdef TODO
+		/* XXX The protocol sometimes sends 0 (text) if the format is not
+		 * determined yet.  We always use binary, so this check is probably
+		 * not useful. */
+		if (format != 1)
+			elog(ERROR, "format must be binary");
+#endif
+
+		TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0);
+	}
+	return tupdesc;
+}
+
+
+static HeapTuple
+HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg)
+{
+	int			natts = pq_getmsgint(msg, 2);
+	int			i;
+	Datum	   *values;
+	bool	   *nulls;
+	StringInfoData buf;
+
+	Assert(tupdesc);
+
+	if (natts != tupdesc->natts)
+		elog(ERROR, "malformed DataRow");
+
+	values = palloc(natts * sizeof(*values));
+	nulls = palloc(natts * sizeof(*nulls));
+	initStringInfo(&buf);
+
+	for (i = 0; i < natts; i++)
+	{
+		int32 len = pq_getmsgint(msg, 4);
+
+		if (len < 0)
+			nulls[i] = true;
+		else
+		{
+			Oid recvid;
+			Oid typioparams;
+
+			nulls[i] = false;
+
+			getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid,
+								   &recvid,
+								   &typioparams);
+			resetStringInfo(&buf);
+			appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len);
+			values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams,
+											   tupdesc->attrs[i]->atttypmod);
+		}
+	}
+
+	return heap_form_tuple(tupdesc, values, nulls);
+}
+
+
+static void
+forward_NotifyResponse(StringInfo msg)
+{
+	int32	pid;
+	const char *channel;
+	const char *payload;
+
+	pid = pq_getmsgint(msg, 4);
+	channel = pq_getmsgrawstring(msg);
+	payload = pq_getmsgrawstring(msg);
+	pq_endmessage(msg);
+
+	NotifyMyFrontEnd(channel, payload, pid);
+}
+
+
+static void
+rethrow_errornotice(StringInfo msg)
+{
+	ErrorData   edata;
+
+	pq_parse_errornotice(msg, &edata);
+	edata.elevel = Min(edata.elevel, ERROR);
+	ThrowErrorData(&edata);
+}
+
+
+static void
+invalid_protocol_message(char msgtype)
+{
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid protocol message type from background session: %c",
+					msgtype)));
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 599874e..9b607b1 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -180,8 +180,6 @@ static int	errdetail_execute(List *raw_parsetree_list);
 static int	errdetail_params(ParamListInfo params);
 static int	errdetail_abort(void);
 static int	errdetail_recovery_conflict(void);
-static void start_xact_command(void);
-static void finish_xact_command(void);
 static bool IsTransactionExitStmt(Node *parsetree);
 static bool IsTransactionExitStmtList(List *parseTrees);
 static bool IsTransactionStmtList(List *parseTrees);
@@ -869,8 +867,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams)
  *
  * Execute a "simple Query" protocol message.
  */
-static void
-exec_simple_query(const char *query_string)
+void
+exec_simple_query(const char *query_string, int16 format)
 {
 	CommandDest dest = whereToSendOutput;
 	MemoryContext oldcontext;
@@ -963,7 +961,6 @@ exec_simple_query(const char *query_string)
 				   *plantree_list;
 		Portal		portal;
 		DestReceiver *receiver;
-		int16		format;
 
 		/*
 		 * Get the command name for use in status display (it also becomes the
@@ -1054,6 +1051,8 @@ exec_simple_query(const char *query_string)
 		 */
 		PortalStart(portal, NULL, 0, InvalidSnapshot);
 
+		if (format < 0)
+		{
 		/*
 		 * Select the appropriate output format: text unless we are doing a
 		 * FETCH from a binary cursor.  (Pretty grotty to have to do this here
@@ -1074,6 +1073,7 @@ exec_simple_query(const char *query_string)
 					format = 1; /* BINARY */
 			}
 		}
+		}
 		PortalSetResultFormat(portal, 1, &format);
 
 		/*
@@ -1185,7 +1185,7 @@ exec_simple_query(const char *query_string)
  *
  * Execute a "Parse" protocol message.
  */
-static void
+void
 exec_parse_message(const char *query_string,	/* string to execute */
 				   const char *stmt_name,		/* name for prepared stmt */
 				   Oid *paramTypes,		/* parameter types */
@@ -1447,7 +1447,7 @@ exec_parse_message(const char *query_string,	/* string to execute */
  *
  * Process a "Bind" message to create a portal from a prepared statement
  */
-static void
+void
 exec_bind_message(StringInfo input_message)
 {
 	const char *portal_name;
@@ -1829,7 +1829,7 @@ exec_bind_message(StringInfo input_message)
  *
  * Process an "Execute" message for a portal
  */
-static void
+void
 exec_execute_message(const char *portal_name, long max_rows)
 {
 	CommandDest dest;
@@ -2278,7 +2278,7 @@ errdetail_recovery_conflict(void)
  *
  * Process a "Describe" message for a prepared statement
  */
-static void
+void
 exec_describe_statement_message(const char *stmt_name)
 {
 	CachedPlanSource *psrc;
@@ -2422,7 +2422,7 @@ exec_describe_portal_message(const char *portal_name)
 /*
  * Convenience routines for starting/committing a single command.
  */
-static void
+void
 start_xact_command(void)
 {
 	if (!xact_started)
@@ -2442,7 +2442,7 @@ start_xact_command(void)
 	}
 }
 
-static void
+void
 finish_xact_command(void)
 {
 	if (xact_started)
@@ -4066,7 +4066,7 @@ PostgresMain(int argc, char *argv[],
 					if (am_walsender)
 						exec_replication_command(query_string);
 					else
-						exec_simple_query(query_string);
+						exec_simple_query(query_string, -1);
 
 					send_ready_for_query = true;
 				}
diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h
index 8105951..73faff7 100644
--- a/src/include/commands/variable.h
+++ b/src/include/commands/variable.h
@@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s
 extern bool check_random_seed(double *newval, void **extra, GucSource source);
 extern void assign_random_seed(double newval, void *extra);
 extern const char *show_random_seed(void);
+extern void (*check_client_encoding_hook)(void);
 extern bool check_client_encoding(char **newval, void **extra, GucSource source);
 extern void assign_client_encoding(const char *newval, void *extra);
 extern bool check_session_authorization(char **newval, void **extra, GucSource source);
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index 8c03acb..6cc0090 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh);
+extern void pq_stop_redirect_to_shm_mq(void);
 extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h
new file mode 100644
index 0000000..70dad45
--- /dev/null
+++ b/src/include/tcop/bgsession.h
@@ -0,0 +1,26 @@
+#ifndef BGSESSION_H
+#define BGSESSION_H
+
+#include "access/tupdesc.h"
+#include "nodes/pg_list.h"
+
+struct BackgroundSession;
+typedef struct BackgroundSession BackgroundSession;
+
+struct BackgroundSessionPreparedStatement;
+typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement;
+
+typedef struct BackgroundSessionResult
+{
+	TupleDesc	tupdesc;
+	List	   *tuples;
+	const char *command;
+} BackgroundSessionResult;
+
+BackgroundSession *BackgroundSessionStart(void);
+void BackgroundSessionEnd(BackgroundSession *session);
+BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql);
+BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]);
+BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]);
+
+#endif /* BGSESSION_H */
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 7254355..8f2ff8a 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -57,6 +57,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions,
 			  ParamListInfo boundParams);
 extern List *pg_plan_queries(List *querytrees, int cursorOptions,
 				ParamListInfo boundParams);
+extern void exec_simple_query(const char *query_string, int16 format);
+extern void exec_parse_message(const char *query_string, const char *stmt_name,
+							   Oid paramTypes[], int numParams);
+extern void exec_bind_message(StringInfo input_message);
+extern void exec_execute_message(const char *portal_name, long max_rows);
+extern void exec_describe_statement_message(const char *stmt_name);
 
 extern bool check_max_stack_depth(int *newval, void **extra, GucSource source);
 extern void assign_max_stack_depth(int newval, void *extra);
@@ -70,6 +76,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S
 extern void ProcessClientReadInterrupt(bool blocked);
 extern void ProcessClientWriteInterrupt(bool blocked);
 
+extern void start_xact_command(void);
+extern void finish_xact_command(void);
+
 extern void process_postgres_switches(int argc, char *argv[],
 						  GucContext ctx, const char **dbname);
 extern void PostgresMain(int argc, char *argv[],
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index 7680d49..9895a6e 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language"
 NAME = plpython$(python_majorversion)
 
 OBJS = \
+	plpy_bgsession.o \
 	plpy_cursorobject.o \
 	plpy_elog.o \
 	plpy_exec.o \
@@ -89,6 +90,7 @@ REGRESS = \
 	plpython_quote \
 	plpython_composite \
 	plpython_subtransaction \
+	plpython_bgsession \
 	plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_bgsession.out b/src/pl/plpython/expected/plpython_bgsession.out
new file mode 100644
index 0000000..b8dff07
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_bgsession.out
@@ -0,0 +1,188 @@
+CREATE TABLE test1 (a int, b text);
+CREATE FUNCTION bgsession_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test();
+ bgsession_test 
+----------------
+             42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test2();
+INFO:  <PLyResult status=5 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]>
+ bgsession_test2 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+SELECT bgsession_test3();
+NOTICE:  notice
+ERROR:  error
+CONTEXT:  PL/pgSQL function inline_code_block line 1 at RAISE
+PL/Python function "bgsession_test3"
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+SELECT bgsession_test4();
+ERROR:  cannot set client encoding in background session
+CONTEXT:  PL/Python function "bgsession_test4"
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+SELECT bgsession_test5();
+ bgsession_test5 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a |  b  
+---+-----
+ 1 | one
+ 2 | two
+(2 rows)
+
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+SELECT bgsession_test7();
+INFO:  <PLyResult status=5 nrows=1 rows=[{'a': 11, 'b': None}]>
+ bgsession_test7 
+-----------------
+              42
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+SELECT bgsession_test8();
+ERROR:  background session ended with transaction block open
+CONTEXT:  PL/Python function "bgsession_test8"
+TRUNCATE test1;
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+SELECT bgsession_test9a();
+ bgsession_test9a 
+------------------
+                1
+(1 row)
+
+SELECT bgsession_test9b();
+ bgsession_test9b 
+------------------
+                2
+(1 row)
+
+SELECT * FROM test1;
+ a | b 
+---+---
+ 1 | 
+ 2 | 
+(2 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out
index 847e4cc..2001b60 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,8 +43,9 @@ contents.sort()
 return contents
 $$ LANGUAGE plpythonu;
 select module_contents();
- module_contents 
------------------
+  module_contents  
+-------------------
+ BackgroundSession
  Error
  Fatal
  SPIError
@@ -63,7 +64,7 @@ select module_contents();
  spiexceptions
  subtransaction
  warning
-(18 rows)
+(19 rows)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
 AS $$
diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c
new file mode 100644
index 0000000..68f7207
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.c
@@ -0,0 +1,454 @@
+/*
+ * the PLyBackgroundSession class
+ *
+ * src/pl/plpython/plpy_bgsession.c
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "executor/spi.h"
+#include "parser/parse_type.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+#include "plpython.h"
+
+#include "plpy_bgsession.h"
+
+#include "plpy_elog.h"
+#include "plpy_main.h"
+#include "plpy_planobject.h"
+#include "plpy_spi.h"
+
+
+static PyObject *PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw);
+static void PLyBackgroundSession_dealloc(PyObject *subxact);
+static PyObject *PLyBackgroundSession_enter(PyObject *self, PyObject *unused);
+static PyObject *PLyBackgroundSession_close(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_prepare(PyObject *self, PyObject *args);
+static PyObject *PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args);
+
+static char PLyBackgroundSession_doc[] = {
+	"PostgreSQL background session context manager"
+};
+
+static PyMethodDef PLyBackgroundSession_methods[] = {
+	{"close", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"__enter__", PLyBackgroundSession_enter, METH_VARARGS, NULL},
+	{"__exit__", PLyBackgroundSession_close, METH_VARARGS, NULL},
+	{"execute", PLyBackgroundSession_execute, METH_VARARGS, NULL},
+	{"prepare", PLyBackgroundSession_prepare, METH_VARARGS, NULL},
+	{"execute_prepared", PLyBackgroundSession_execute_prepared, METH_VARARGS, NULL},
+	{NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLyBackgroundSession_Type = {
+	PyVarObject_HEAD_INIT(NULL, 0)
+	"plpy.BackgroundSession",		/* tp_name */
+	sizeof(PLyBackgroundSession_Object),	/* tp_size */
+	0,								/* tp_itemsize */
+
+	/*
+	 * methods
+	 */
+	PLyBackgroundSession_dealloc,	/* tp_dealloc */
+	0,								/* tp_print */
+	0,								/* tp_getattr */
+	0,								/* tp_setattr */
+	0,								/* tp_compare */
+	0,								/* tp_repr */
+	0,								/* tp_as_number */
+	0,								/* tp_as_sequence */
+	0,								/* tp_as_mapping */
+	0,								/* tp_hash */
+	0,								/* tp_call */
+	0,								/* tp_str */
+	0,								/* tp_getattro */
+	0,								/* tp_setattro */
+	0,								/* tp_as_buffer */
+	Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,	/* tp_flags */
+	PLyBackgroundSession_doc,		/* tp_doc */
+	0,								/* tp_traverse */
+	0,								/* tp_clear */
+	0,								/* tp_richcompare */
+	0,								/* tp_weaklistoffset */
+	0,								/* tp_iter */
+	0,								/* tp_iternext */
+	PLyBackgroundSession_methods,	/* tp_tpmethods */
+	0,								/* tp_members */
+	0,								/* tp_getset */
+	0,								/* tp_base */
+	0,								/* tp_dict */
+	0,								/* tp_descr_get */
+	0,								/* tp_descr_set */
+	0,								/* tp_dictoffset */
+	0,								/* tp_init */
+	0,								/* tp_alloc */
+	PLyBackgroundSession_new,		/* tp_new */
+	0,								/* tp_free */
+};
+
+
+int
+PLy_bgsession_init_type(PyObject *module)
+{
+	if (PyType_Ready(&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	Py_INCREF(&PLyBackgroundSession_Type);
+	if (PyModule_AddObject(module, "BackgroundSession", (PyObject *)&PLyBackgroundSession_Type) < 0)
+		return -1;
+
+	return 0;
+}
+
+static PyObject *
+PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw)
+{
+	PyObject   *result = type->tp_alloc(type, 0);
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result;
+
+	bgsession->bgsession = BackgroundSessionStart();
+
+	return result;
+}
+
+/*
+ * Python requires a dealloc function to be defined
+ */
+static void
+PLyBackgroundSession_dealloc(PyObject *self)
+{
+}
+
+/*
+ * bgsession.__enter__() or bgsession.enter()
+ */
+static PyObject *
+PLyBackgroundSession_enter(PyObject *self, PyObject *unused)
+{
+	Py_INCREF(self);
+	return self;
+}
+
+/*
+ * bgsession.close() or bgsession.__exit__(exc_type, exc, tb)
+ */
+static PyObject *
+PLyBackgroundSession_close(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	BackgroundSessionEnd(bgsession->bgsession);
+	bgsession->bgsession = NULL;
+
+	Py_INCREF(Py_None);
+	return Py_None;
+}
+
+static PyObject *
+PLyBackgroundSession_execute(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (PyArg_ParseTuple(args, "s:execute", &query))
+	{
+		BackgroundSessionResult *result;
+		HeapTuple  *tuples;
+		ListCell   *lc;
+		int			i;
+		SPITupleTable faketupletable;
+
+		result = BackgroundSessionExecute(bgsession->bgsession, query);
+		if (result->tupdesc)
+		{
+			tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+			i = 0;
+			foreach (lc, result->tuples)
+			{
+				HeapTuple tuple = (HeapTuple) lfirst(lc);
+				tuples[i++] = tuple;
+			}
+			faketupletable.tupdesc = result->tupdesc;
+			faketupletable.vals = tuples;
+			return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+		}
+		else
+			return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+	}
+	else
+		PLy_exception_set(PLy_exc_error, "background session execute expected a query");
+	return NULL;
+}
+
+// XXX lots of overlap with PLy_spi_prepare
+static PyObject *
+PLyBackgroundSession_prepare(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self;
+	char	   *query;
+	PyObject   *paraminfo = NULL;
+	BackgroundSessionPreparedStatement *bgstmt;
+	int			nargs = 0;
+	const char **argnames = NULL;
+	PLyPlanObject *plan;
+	PyObject   *volatile optr = NULL;
+	volatile MemoryContext oldcontext;
+	int			i;
+	PLyExecutionContext *exec_ctx = PLy_current_execution_context();
+	PyObject *keys;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &paraminfo))
+		return NULL;
+
+	if (paraminfo &&
+		!PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of prepare must be a sequence or mapping");
+		return NULL;
+	}
+
+	if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL)
+		return NULL;
+
+	plan->mcxt = AllocSetContextCreate(TopMemoryContext,
+									   "PL/Python background plan context",
+									   ALLOCSET_DEFAULT_MINSIZE,
+									   ALLOCSET_DEFAULT_INITSIZE,
+									   ALLOCSET_DEFAULT_MAXSIZE);
+
+	oldcontext = MemoryContextSwitchTo(plan->mcxt);
+
+	if (!paraminfo)
+		nargs = 0;
+	else if (PySequence_Check(paraminfo))
+		nargs = PySequence_Length(paraminfo);
+	else
+		nargs = PyMapping_Length(paraminfo);
+
+	plan->nargs = nargs;
+	plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL;
+	plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL;
+	plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	if (PyMapping_Check(paraminfo))
+	{
+		argnames = palloc(nargs * sizeof(char *));
+		keys = PyMapping_Keys(paraminfo);
+	}
+	else
+	{
+		argnames = NULL;
+		keys = NULL;
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		PLy_typeinfo_init(&plan->args[i], plan->mcxt);
+		plan->values[i] = PointerGetDatum(NULL);
+	}
+
+	for (i = 0; i < nargs; i++)
+	{
+		char	   *sptr;
+		HeapTuple	typeTup;
+		Oid			typeId;
+		int32		typmod;
+
+		if (keys)
+		{
+			PyObject *key;
+			char *keystr;
+
+			key = PySequence_GetItem(keys, i);
+			argnames[i] = keystr = PyString_AsString(key);
+			optr = PyMapping_GetItemString(paraminfo, keystr);
+			Py_DECREF(key);
+		}
+		else
+			optr = PySequence_GetItem(paraminfo, i);
+
+		if (PyString_Check(optr))
+			sptr = PyString_AsString(optr);
+		else if (PyUnicode_Check(optr))
+			sptr = PLyUnicode_AsString(optr);
+		else
+		{
+			ereport(ERROR,
+					(errmsg("background session prepare: type name at ordinal position %d is not a string", i)));
+			sptr = NULL;	/* keep compiler quiet */
+		}
+
+		/********************************************************
+		 * Resolve argument type names and then look them up by
+		 * oid in the system cache, and remember the required
+		 *information for input conversion.
+		 ********************************************************/
+
+		parseTypeString(sptr, &typeId, &typmod, false);
+
+		typeTup = SearchSysCache1(TYPEOID,
+								  ObjectIdGetDatum(typeId));
+		if (!HeapTupleIsValid(typeTup))
+			elog(ERROR, "cache lookup failed for type %u", typeId);
+
+		Py_DECREF(optr);
+
+		/*
+		 * set optr to NULL, so we won't try to unref it again in case of
+		 * an error
+		 */
+		optr = NULL;
+
+		plan->types[i] = typeId;
+		PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes);
+		ReleaseSysCache(typeTup);
+	}
+
+	bgstmt = BackgroundSessionPrepare(bgsession->bgsession, query, nargs, plan->types, argnames);
+
+	plan->bgstmt = bgstmt;
+
+	return (PyObject *) plan;
+}
+
+static PyObject *
+PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args)
+{
+	PLyBackgroundSession_Object *bgsession pg_attribute_unused() = (PLyBackgroundSession_Object *) self;
+	PyObject   *ob;
+	PLyPlanObject *plan;
+	PyObject   *list = NULL;
+	int			nargs;
+	bool	   *nulls;
+	BackgroundSessionResult *result;
+	HeapTuple  *tuples;
+	ListCell   *lc;
+	int			i;
+	SPITupleTable faketupletable;
+
+	if (!bgsession->bgsession)
+	{
+		PLy_exception_set(PyExc_ValueError, "this background session has already been closed");
+		return NULL;
+	}
+
+	if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list))
+		return NULL;
+
+	if (!is_PLyPlanObject(ob))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "first argument of execute_prepared must be a plan");
+		return NULL;
+	}
+
+	plan = (PLyPlanObject *) ob;
+
+	if (list && (!PySequence_Check(list)))
+	{
+		PLy_exception_set(PyExc_TypeError,
+						  "second argument of execute_prepared must be a sequence");
+		return NULL;
+	}
+
+	nargs = list ? PySequence_Length(list) : 0;
+
+	if (nargs != plan->nargs)
+	{
+		char	   *sv;
+		PyObject   *so = PyObject_Str(list);
+
+		if (!so)
+			PLy_elog(ERROR, "could not execute plan");
+		sv = PyString_AsString(so);
+		PLy_exception_set_plural(PyExc_TypeError,
+							  "Expected sequence of %d argument, got %d: %s",
+							 "Expected sequence of %d arguments, got %d: %s",
+								 plan->nargs,
+								 plan->nargs, nargs, sv);
+		Py_DECREF(so);
+
+		return NULL;
+	}
+
+	nulls = palloc(nargs * sizeof(*nulls));
+
+	for (i = 0; i < nargs; i++)
+	{
+		PyObject   *elem;
+
+		elem = PySequence_GetItem(list, i);
+		if (elem != Py_None)
+		{
+			PG_TRY();
+			{
+				plan->values[i] =
+					plan->args[i].out.d.func(&(plan->args[i].out.d),
+											 -1,
+											 elem,
+											 false);
+			}
+			PG_CATCH();
+			{
+				Py_DECREF(elem);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+
+			Py_DECREF(elem);
+			nulls[i] = false;
+		}
+		else
+		{
+			Py_DECREF(elem);
+			plan->values[i] =
+				InputFunctionCall(&(plan->args[i].out.d.typfunc),
+								  NULL,
+								  plan->args[i].out.d.typioparam,
+								  -1);
+			nulls[i] = true;
+		}
+	}
+
+	result = BackgroundSessionExecutePrepared(plan->bgstmt, nargs, plan->values, nulls);
+	if (result->tupdesc)
+	{
+		tuples = palloc(list_length(result->tuples) * sizeof(*tuples));
+		i = 0;
+		foreach (lc, result->tuples)
+		{
+			HeapTuple tuple = (HeapTuple) lfirst(lc);
+			tuples[i++] = tuple;
+		}
+		faketupletable.tupdesc = result->tupdesc;
+		faketupletable.vals = tuples;
+		return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT);
+	}
+	else
+		return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY);
+}
diff --git a/src/pl/plpython/plpy_bgsession.h b/src/pl/plpython/plpy_bgsession.h
new file mode 100644
index 0000000..39daca2
--- /dev/null
+++ b/src/pl/plpython/plpy_bgsession.h
@@ -0,0 +1,18 @@
+/*
+ * src/pl/plpython/plpy_bgsession.h
+ */
+
+#ifndef PLPY_BGSESSION_H
+#define PLPY_BGSESSION_H
+
+#include "tcop/bgsession.h"
+
+typedef struct PLyBackgroundSession_Object
+{
+	PyObject_HEAD
+	BackgroundSession *bgsession;
+} PLyBackgroundSession_Object;
+
+extern int PLy_bgsession_init_type(PyObject *module);
+
+#endif   /* PLPY_BGSESSION_H */
diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h
index 10426c4..b7ee34c 100644
--- a/src/pl/plpython/plpy_main.h
+++ b/src/pl/plpython/plpy_main.h
@@ -7,6 +7,8 @@
 
 #include "plpy_procedure.h"
 
+#include "tcop/bgsession.h"
+
 /* the interpreter's globals dict */
 extern PyObject *PLy_interp_globals;
 
@@ -19,6 +21,7 @@ typedef struct PLyExecutionContext
 {
 	PLyProcedure *curr_proc;	/* the currently executing procedure */
 	MemoryContext scratch_ctx;	/* a context for things like type I/O */
+	BackgroundSession *bgsession;
 	struct PLyExecutionContext *next;	/* previous stack level */
 } PLyExecutionContext;
 
diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c
index 16c39a0..006a3af 100644
--- a/src/pl/plpython/plpy_planobject.c
+++ b/src/pl/plpython/plpy_planobject.c
@@ -77,6 +77,7 @@ PLy_plan_new(void)
 		return NULL;
 
 	ob->plan = NULL;
+	ob->bgstmt = NULL;
 	ob->nargs = 0;
 	ob->types = NULL;
 	ob->values = NULL;
diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h
index c675592..fba814a 100644
--- a/src/pl/plpython/plpy_planobject.h
+++ b/src/pl/plpython/plpy_planobject.h
@@ -6,6 +6,7 @@
 #define PLPY_PLANOBJECT_H
 
 #include "executor/spi.h"
+#include "tcop/bgsession.h"
 #include "plpy_typeio.h"
 
 
@@ -13,6 +14,7 @@ typedef struct PLyPlanObject
 {
 	PyObject_HEAD
 	SPIPlanPtr	plan;
+	BackgroundSessionPreparedStatement *bgstmt;
 	int			nargs;
 	Oid		   *types;
 	Datum	   *values;
diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c
index d80dc51..06848bc 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -13,6 +13,7 @@
 
 #include "plpy_plpymodule.h"
 
+#include "plpy_bgsession.h"
 #include "plpy_cursorobject.h"
 #include "plpy_elog.h"
 #include "plpy_planobject.h"
@@ -134,6 +135,8 @@ PyInit_plpy(void)
 		return NULL;
 
 	PLy_add_exceptions(m);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		return NULL;
 
 	return m;
 }
@@ -164,6 +167,8 @@ PLy_init_plpy(void)
 #else
 	plpy = Py_InitModule("plpy", PLy_methods);
 	PLy_add_exceptions(plpy);
+	if (PLy_bgsession_init_type(plpy) < 0)
+		PLy_elog(ERROR, "could not initialize BackgroundSession type");
 #endif
 
 	/* PyDict_SetItemString(plpy, "PlanType", (PyObject *) &PLy_PlanType); */
diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c
index b082d01..ec9eeec 100644
--- a/src/pl/plpython/plpy_spi.c
+++ b/src/pl/plpython/plpy_spi.c
@@ -31,8 +31,6 @@
 
 static PyObject *PLy_spi_execute_query(char *query, long limit);
 static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit);
-static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable,
-							 uint64 rows, int status);
 static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata);
 
 
@@ -292,6 +290,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit)
 		rv = SPI_execute_plan(plan->plan, plan->values, nulls,
 							  exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		if (nargs > 0)
 			pfree(nulls);
@@ -361,6 +360,7 @@ PLy_spi_execute_query(char *query, long limit)
 		pg_verifymbstr(query, strlen(query), false);
 		rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit);
 		ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv);
+		SPI_freetuptable(SPI_tuptable);
 
 		PLy_spi_subtransaction_commit(oldcontext, oldowner);
 	}
@@ -383,7 +383,7 @@ PLy_spi_execute_query(char *query, long limit)
 	return ret;
 }
 
-static PyObject *
+PyObject *
 PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 {
 	PLyResultObject *result;
@@ -470,7 +470,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status)
 		PG_END_TRY();
 
 		MemoryContextDelete(cxt);
-		SPI_freetuptable(tuptable);
 	}
 
 	return (PyObject *) result;
diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h
index b042794..9ed37e5 100644
--- a/src/pl/plpython/plpy_spi.h
+++ b/src/pl/plpython/plpy_spi.h
@@ -5,12 +5,15 @@
 #ifndef PLPY_SPI_H
 #define PLPY_SPI_H
 
+#include "executor/spi.h"
 #include "utils/palloc.h"
 #include "utils/resowner.h"
 
 extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args);
 extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args);
 
+extern PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status);
+
 typedef struct PLyExceptionEntry
 {
 	int			sqlstate;		/* hash key, must be first */
diff --git a/src/pl/plpython/sql/plpython_bgsession.sql b/src/pl/plpython/sql/plpython_bgsession.sql
new file mode 100644
index 0000000..5c33ab2
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_bgsession.sql
@@ -0,0 +1,148 @@
+CREATE TABLE test1 (a int, b text);
+
+CREATE FUNCTION bgsession_test() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    for i in range(0, 10):
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+        if i % 2 == 0:
+            a.execute("COMMIT")
+        else:
+            a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test2() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        a.execute("INSERT INTO test1 (a) VALUES (11)")
+        rv = a.execute("SELECT * FROM test1")
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test2();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test3() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$")
+    a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$")
+
+return 42
+$$;
+
+SELECT bgsession_test3();
+
+
+CREATE FUNCTION bgsession_test4() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    a.execute("SET client_encoding TO SJIS")
+
+return 42
+$$;
+
+SELECT bgsession_test4();
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test5() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+    plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"])
+    a.execute_prepared(plan, [1, "one"])
+    a.execute_prepared(plan, [2, "two"])
+
+return 42
+$$;
+
+SELECT bgsession_test5();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test7() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+        plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"])
+        a.execute_prepared(plan, [11])
+        plan = a.prepare("SELECT * FROM test1")
+        rv = a.execute_prepared(plan, [])
+        plpy.info(rv)
+        a.execute("ROLLBACK")
+
+return 42
+$$;
+
+SELECT bgsession_test7();
+
+SELECT * FROM test1;
+
+
+CREATE FUNCTION bgsession_test8() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+with plpy.BackgroundSession() as a:
+        a.execute("BEGIN")
+
+return 42
+$$;
+
+SELECT bgsession_test8();
+
+
+TRUNCATE test1;
+
+CREATE FUNCTION bgsession_test9a() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = plpy.BackgroundSession()
+GD['bg'] = bg
+bg.execute("BEGIN")
+bg.execute("INSERT INTO test1 VALUES (1)")
+
+return 1
+$$;
+
+CREATE FUNCTION bgsession_test9b() RETURNS integer
+LANGUAGE plpythonu
+AS $$
+bg = GD['bg']
+bg.execute("INSERT INTO test1 VALUES (2)")
+bg.execute("COMMIT")
+bg.close()
+
+return 2
+$$;
+
+SELECT bgsession_test9a();
+SELECT bgsession_test9b();
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
-- 
2.10.2

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to