Attached is a patch that adds two new concepts: parallel mode, and
parallel contexts.  The idea of this code is to provide a framework
for specific parallel things that you want to do, such as parallel
sequential scan or parallel sort.  When you're in parallel mode,
certain operations - like DDL, and anything that would update the
command counter - are prohibited.  But you gain the ability to create
a parallel context, which in turn can be used to fire up parallel
workers.  And if you do that, then your snapshot, combo CID hash, and
GUC values will be copied to the worker, which is handy.

This patch is very much half-baked.  Among the things that aren't right yet:

- There's no handling of heavyweight locking, so I'm quite sure it'll
be possible to cause undetected deadlocks if you work at it.  There
are some existing threads on this topic and perhaps we can incorporate
one of those concepts into this patch, but this version does not.
- There's no provision for copying the parent's XID and sub-XIDs, if
any, to the background workers, which means that if you use this and
your transaction has written data, you will get wrong answers, because
TransactionIdIsCurrentTransactionId() will do the wrong thing.
- There's no really deep integration with the transaction system yet.
Previous discussions seem to point toward the need to do various types
of coordinated cleanup when the parallel phase is done, or when an
error happens.  In particular, you probably don't want the abort
record to get written while there are still possibly backends that are
part of that transaction doing work; and you certainly don't want
files created by the current transaction to get removed while some
other backend is still writing them.  The right way to work all of
this out needs some deep thought; agreeing on what the design should
be is probably harder than implement it.

Despite the above, I think this does a fairly good job laying out how
I believe parallelism can be made to work in PostgreSQL: copy a bunch
of state from the user backend to the parallel workers, compute for a
while, and then shut everything down.  Meanwhile, while parallelism is
running, forbid changes to state that's already been synchronized, so
that things don't get out of step.  I think the patch it shows how the
act of synchronizing state from the master to the workers can be made
quite modular and painless, even though it doesn't synchronize
everything relevant.  I'd really appreciate any design thoughts anyone
may have on how to fix the problems mentioned above, how to fix any
other problems you foresee, or even just a list of reasons why you
think this will blow up.

What I think is that we're really pretty close to do real parallelism,
and that this is probably the last major piece of infrastructure that
we need in order to support parallel execution in a reasonable way.
That's a pretty bold statement, but I believe it to be true: despite
the limitations of the current version of this patch, I think we're
very close to being able to sit down and code up a parallel algorithm
in PostgreSQL and have that not be all that hard.  Once we get the
first one, I expect a whole bunch more to come together far more
quickly than the first one did.

I would be remiss if I failed to mention that this patch includes work
by my colleagues Amit Kapila, Rushabh Lathia, and Jeevan Chalke, as
well as my former colleague Noah Misch; and that it would not have
been possible without the patient support of EnterpriseDB management.

Thanks,

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile
new file mode 100644
index 0000000..de00f50
--- /dev/null
+++ b/contrib/parallel_dummy/Makefile
@@ -0,0 +1,19 @@
+MODULE_big = parallel_dummy
+OBJS = parallel_dummy.o $(WIN32RES)
+PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure"
+
+EXTENSION = parallel_dummy
+DATA = parallel_dummy--1.0.sql
+
+REGRESS = parallel_dummy
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/parallel_dummy
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql
new file mode 100644
index 0000000..2a7251c
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql
@@ -0,0 +1,7 @@
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit
+
+CREATE FUNCTION parallel_dummy(sleep_time pg_catalog.int4,
+							  nworkers pg_catalog.int4)
+    RETURNS pg_catalog.void STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c
new file mode 100644
index 0000000..99b830f
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.c
@@ -0,0 +1,82 @@
+/*--------------------------------------------------------------------------
+ *
+ * parallel_dummy.c
+ *		Test harness code for parallel mode code.
+ *
+ * Copyright (C) 2013-2014, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		contrib/parallel_dummy/parallel_dummy.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/parallel.h"
+#include "access/xact.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "utils/builtins.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(parallel_dummy);
+
+#define		PARALLEL_DUMMY_KEY			1
+
+typedef struct {
+	int32		sleep_time;
+} ParallelDummyInfo;
+
+void		_PG_init(void);
+void		worker_main(shm_toc *toc);
+
+Datum
+parallel_dummy(PG_FUNCTION_ARGS)
+{
+	int32		sleep_time = PG_GETARG_INT32(0);
+	int32		nworkers = PG_GETARG_INT32(1);
+	bool		already_in_parallel_mode = IsInParallelMode();
+	ParallelContext *pcxt;
+	ParallelDummyInfo *info;
+
+	if (nworkers < 1)
+		ereport(ERROR,
+				(errmsg("number of parallel workers must be positive")));
+
+	if (!already_in_parallel_mode)
+		EnterParallelMode();
+
+	pcxt = CreateParallelContextForExtension("parallel_dummy", "worker_main",
+											 nworkers);
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelDummyInfo));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+	InitializeParallelDSM(pcxt);
+	info = shm_toc_allocate(pcxt->toc, sizeof(ParallelDummyInfo));
+	info->sleep_time = sleep_time;
+	shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info);
+	LaunchParallelWorkers(pcxt);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time));
+
+	DestroyParallelContext(pcxt);
+
+	if (!already_in_parallel_mode)
+		ExitParallelMode();
+
+	PG_RETURN_VOID();
+}
+
+void
+worker_main(shm_toc *toc)
+{
+	ParallelDummyInfo *info;
+
+	info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY);
+	Assert(info != NULL);
+
+	/* here's where we do the "real work" ... */
+	DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time));
+}
diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control
new file mode 100644
index 0000000..90bae3f
--- /dev/null
+++ b/contrib/parallel_dummy/parallel_dummy.control
@@ -0,0 +1,4 @@
+comment = 'Dummy parallel code'
+default_version = '1.0'
+module_pathname = '$libdir/parallel_dummy'
+relocatable = true
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index df4853b..31efa37 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2636,6 +2636,16 @@ heap_delete(Relation relation, ItemPointer tid,
 
 	Assert(ItemPointerIsValid(tid));
 
+	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot delete tuples during a parallel operation")));
+
 	block = ItemPointerGetBlockNumber(tid);
 	buffer = ReadBuffer(relation, block);
 	page = BufferGetPage(buffer);
@@ -3077,6 +3087,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 	Assert(ItemPointerIsValid(otid));
 
 	/*
+	 * Forbid this during a parallel operation, lest it allocate a combocid.
+	 * Other workers might need that combocid for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
+	/*
 	 * Fetch the list of attributes to be checked for HOT update.  This is
 	 * wasted effort if we fail to update or have to put the new tuple on a
 	 * different page.  But we must compute the list before obtaining buffer
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 9d4d5db..94455b2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/access/transam
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \
+OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \
 	timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \
 	xact.o xlog.o xlogarchive.o xlogfuncs.o \
 	xloginsert.o xlogreader.o xlogutils.o
diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel
new file mode 100644
index 0000000..e427275
--- /dev/null
+++ b/src/backend/access/transam/README.parallel
@@ -0,0 +1,36 @@
+Overview
+========
+
+Before beginning any parallel operation, call EnterParallelMode(); after all
+parallel operations are completed, call ExitParallelMode().  These functions
+don't launch any workers or directly enable parallelism, but they put in place
+a variety of prohibitions required to make parallelism safe.
+
+To actually parallelize a particular operation, use a ParallelContext.  This
+establishes a dynamic shared memory segment and registers dynamic background
+workers which will attach to that segment.  We arrange to synchronize various
+pieces of state - such as, most simply, the database and user OIDs - from the
+backend that is initiating parallelism to all of the background workers
+launched via a ParallelContext.  The basic coding pattern looks like this:
+
+	EnterParallelMode();		/* prohibit unsafe state changes */
+
+	pcxt = CreateParallelContext(entrypoint, nworkers);
+
+	/* Allow space for application-specific data here. */
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, keys);
+
+	InitializeParallelDSM(pcxt);	/* create DSM and copy state to it */
+
+	/* Store the data for which we reserved space. */
+	space = shm_toc_allocate(pcxt->toc, size);
+	shm_toc_insert(pcxt->toc, key, space);
+
+	LaunchParallelWorkers(pcxt);
+
+	/* do parallel stuff */
+
+	DestroyParallelContext(pcxt);
+
+	ExitParallelMode();
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
new file mode 100644
index 0000000..eeae591
--- /dev/null
+++ b/src/backend/access/transam/parallel.c
@@ -0,0 +1,787 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.c
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/transam/parallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/parallel.h"
+#include "commands/async.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/sinval.h"
+#include "storage/spin.h"
+#include "utils/combocid.h"
+#include "utils/guc.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/snapmgr.h"
+
+/*
+ * We don't want to waste a lot of memory on an error queue which, most of
+ * the time, will process only a handful of small messages.  However, it is
+ * desirable to make it large enough that a typical ErrorResponse can be sent
+ * without blocking.  That way, a worker that errors out can write the whole
+ * message into the queue and terminate without waiting for the user backend.
+ */
+#define	PARALLEL_ERROR_QUEUE_SIZE			16384
+
+/* Magic number for parallel context TOC. */
+#define PARALLEL_MAGIC						0x50477c7c
+
+/*
+ * Magic numbers for parallel state sharing.  Higher-level code should use
+ * smaller values, leaving these very large ones for use by this module.
+ */
+#define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
+#define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
+#define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0003)
+#define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0004)
+#define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0005)
+#define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0006)
+#define PARALLEL_KEY_EXTENSION_TRAMPOLINE	UINT64CONST(0xFFFFFFFFFFFF0007)
+
+/* Fixed-size parallel state. */
+typedef struct FixedParallelState
+{
+	/* Fixed-size state that workers must restore. */
+	Oid			database_id;
+	Oid			authenticated_user_id;
+	Oid			current_user_id;
+	int			sec_context;
+	PGPROC	   *parallel_master_pgproc;
+	pid_t		parallel_master_pid;
+	BackendId	parallel_master_backend_id;
+
+	/* Entrypoint for parallel workers. */
+	parallel_worker_main_type	entrypoint;
+
+	/* Track whether workers have attached. */
+	slock_t		mutex;
+	int			workers_expected;
+	int			workers_attached;
+} FixedParallelState;
+
+/*
+ * Our parallel worker number.  We initialize this to -1, meaning that we are
+ * not a parallel worker.  In parallel workers, it will be set to a value >= 0
+ * and < the number of workers before any user code is invoked; each parallel
+ * worker will get a different parallel worker number.
+ */
+int ParallelWorkerNumber = -1;
+
+/* Is there a parallel message pending which we need to receive? */
+bool ParallelMessagePending = false;
+
+/* List of active parallel contexts. */
+static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
+
+/* Private functions. */
+static void HandleParallelMessages(void);
+static void HandleParallelMessage(StringInfo msg);
+static void ParallelMain(Datum main_arg);
+static void ParallelExtensionTrampoline(shm_toc *toc);
+static void handle_sigterm(SIGNAL_ARGS);
+
+/*
+ * Establish a new parallel context.  This should be done after entering
+ * parallel mode, and (unless there is an error) the context should be
+ * destroyed before exiting the current subtransaction.
+ */
+ParallelContext *
+CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext	*pcxt;
+
+	/* It is unsafe to create a parallel context if not in parallel mode. */
+	Assert(IsInParallelMode());
+
+	/* Number of workers should be positive. */
+	Assert(nworkers > 0);
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Initialize a new ParallelContext. */
+	pcxt = palloc0(sizeof(ParallelContext));
+	pcxt->subid = GetCurrentSubTransactionId();
+	pcxt->nworkers = nworkers;
+	pcxt->entrypoint = entrypoint;
+	shm_toc_initialize_estimator(&pcxt->estimator);
+	dlist_push_head(&pcxt_list, &pcxt->node);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish a new parallel context that calls a function provided by an
+ * extension.  This works around the fact that the library might get mapped
+ * at a different address in each backend.
+ */
+ParallelContext *
+CreateParallelContextForExtension(char *library_name, char *function_name,
+								  int nworkers)
+{
+	MemoryContext	oldcontext;
+	ParallelContext *pcxt;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Create the context. */
+	pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers);
+	pcxt->library_name = pstrdup(library_name);
+	pcxt->function_name = pstrdup(function_name);
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+
+	return pcxt;
+}
+
+/*
+ * Establish the dynamic shared memory segment for a parallel context and
+ * copied state and other bookkeeping information that will need by parallel
+ * workers into it.
+ */
+void
+InitializeParallelDSM(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	Size	guc_len;
+	Size	combocidlen;
+	Size	tsnaplen;
+	Size	asnaplen;
+	Size	segsize;
+	int		i;
+	FixedParallelState *fps;
+	char   *gucspace;
+	char   *combocidspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *error_queue_space;
+	Snapshot	transaction_snapshot = GetTransactionSnapshot();
+	Snapshot	active_snapshot = GetActiveSnapshot();
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Allocate space for worker information. */
+	pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
+
+	/*
+	 * Estimate how much space we'll need for state sharing.
+	 *
+	 * If you add more chunks here, you probably need more keys, too.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
+	guc_len = EstimateGUCStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
+	combocidlen = EstimateComboCIDStateSpace();
+	shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
+	tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
+	asnaplen = EstimateSnapshotSpace(active_snapshot);
+	shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
+	shm_toc_estimate_keys(&pcxt->estimator, 5);
+
+	/* Estimate how much space we'll need for error queues. */
+	StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
+		PARALLEL_ERROR_QUEUE_SIZE,
+		"parallel error queue size not buffer-aligned");
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate how much we'll need for extension entrypoint information. */
+	if (pcxt->library_name != NULL)
+	{
+		Assert(pcxt->entrypoint == ParallelExtensionTrampoline);
+		Assert(pcxt->function_name != NULL);
+		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
+							   + strlen(pcxt->function_name) + 2);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	/* Create DSM and initialize with new table of contents. */
+	segsize = shm_toc_estimate(&pcxt->estimator);
+	pcxt->seg = dsm_create(segsize);
+	pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
+							   dsm_segment_address(pcxt->seg),
+							   segsize);
+
+	/* Initialize fixed-size state in shared memory. */
+	fps = (FixedParallelState *)
+		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
+	fps->database_id = MyDatabaseId;
+	fps->authenticated_user_id = GetAuthenticatedUserId();
+	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
+	fps->parallel_master_pgproc = MyProc;
+	fps->parallel_master_pid = MyProcPid;
+	fps->parallel_master_backend_id = MyBackendId;
+	fps->entrypoint = pcxt->entrypoint;
+	SpinLockInit(&fps->mutex);
+	fps->workers_expected = pcxt->nworkers;
+	fps->workers_attached = 0;
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
+
+	/* Serialize GUC state to dynamic shared memory. */
+	gucspace = shm_toc_allocate(pcxt->toc, guc_len);
+	SerializeGUCState(guc_len, gucspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
+
+	/* Serialize combo CID state to dynamic shared memory. */
+	combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
+	SerializeComboCIDState(combocidlen, combocidspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
+
+	/* Serialize transaction snapshots to dynamic shared memory. */
+	tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
+	SerializeSnapshot(transaction_snapshot, tsnaplen, tsnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, tsnapspace);
+	asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
+	SerializeSnapshot(active_snapshot, asnaplen, asnapspace);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
+
+	/*
+	 * Establish error queues in dynamic shared memory.
+	 *
+	 * These queues should be used only for transmitting ErrorResponse,
+	 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data should
+	 * be transmitted via separate (possibly larger?) queue.
+	 */
+	error_queue_space =
+	   shm_toc_allocate(pcxt->toc, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers);
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		shm_mq *mq;
+
+		mq = shm_mq_create(error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE,
+						   PARALLEL_ERROR_QUEUE_SIZE);
+		shm_mq_set_receiver(mq, MyProc);
+		pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
+	}
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
+
+	/* Serialize extension entrypoint information to dynamic shared memory. */
+	if (pcxt->library_name != NULL)
+	{
+		Size	lnamelen = strlen(pcxt->library_name);
+		char *extensionstate;
+
+		extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
+										  + strlen(pcxt->function_name) + 2);
+		strcpy(extensionstate, pcxt->library_name);
+		strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE,
+					   extensionstate);
+	}
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Launch parallel workers.
+ */
+void
+LaunchParallelWorkers(ParallelContext *pcxt)
+{
+	MemoryContext	oldcontext;
+	BackgroundWorker	worker;
+	int		i;
+
+	/* We might be running in a very short-lived memory context. */
+	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
+
+	/* Configure a worker. */
+	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
+			 MyProcPid);
+	worker.bgw_flags =
+		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_main = ParallelMain;
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	/*
+	 * Start workers.
+	 *
+	 * The caller must be able to tolerate ending up with fewer workers than
+	 * expected, so there is no need to throw an error here if registration
+	 * fails.  It wouldn't help much anyway, because registering the worker
+	 * in no way guarantees that it will start up and initialize successfully.
+	 * We do, however, give up on registering any more workers once
+	 * registration fails the first time; no sense beating our head against
+	 * a brick wall.
+	 */
+	for (i = 0; i < pcxt->nworkers; ++i)
+		if (!RegisterDynamicBackgroundWorker(&worker,
+											 &pcxt->worker[i].bgwhandle))
+			break;
+
+	/* Restore previous memory context. */
+	MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Destroy a parallel context.
+ */
+void
+DestroyParallelContext(ParallelContext *pcxt)
+{
+	int		i;
+
+	/*
+	 * Be careful about order of operations here!  We remove the parallel
+	 * context from the list before we do anything else; otherwise, if an
+	 * error occurs during a subsequent step, we might try to nuke it again
+	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
+	 */
+	dlist_delete(&pcxt->node);
+
+	/*
+	 * If any background workers have been started, terminate them all.
+	 * To avoid leaking memory, release the background worker and the message
+	 * queue handle.  This doesn't actually detach the message queue; we'll
+	 * take care of that by detaching the dynamic shared memory segment itself.
+	 */
+	if (pcxt->worker != NULL)
+	{
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			if (pcxt->worker[i].bgwhandle != NULL)
+			{
+				TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
+				pfree(pcxt->worker[i].bgwhandle);
+			}
+			pfree(pcxt->worker[i].error_mqh);
+		}
+	}
+
+	/*
+	 * If we have allocated a shared memory segment, detach it.  This will
+	 * implicitly detach the error queues, and any other shared memory queues,
+	 * stored there.
+	 */
+	if (pcxt->seg != NULL)
+		dsm_detach(pcxt->seg);
+
+	/* Free memory. */
+	pfree(pcxt->worker);
+	pfree(pcxt);
+}
+
+/*
+ * Are there any parallel contexts currently active?
+ */
+bool
+ParallelContextActive(void)
+{
+	return !dlist_is_empty(&pcxt_list);
+}
+
+/*
+ * Handle receipt of an interrupt indicating a parallel worker message.
+ *
+ * If signal_handler is true, we are being called from a signal handler and must
+ * be extremely cautious about what we do here!
+ */
+void
+HandleParallelMessageInterrupt(bool signal_handler)
+{
+	int			save_errno = errno;
+
+	/* Don't joggle the elbow of proc_exit */
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ParallelMessagePending = true;
+
+		/*
+		 * If it's safe to interrupt, service the interrupt immediately.
+		 * (We shouldn't be in parallel mode if waiting for the user to send
+		 * a new query, but we could be waiting for a lock.)
+		 */
+		if ((ImmediateInterruptOK || !signal_handler) && InterruptHoldoffCount == 0
+			&& CritSectionCount == 0)
+		{
+			bool notify_enabled;
+			bool catchup_enabled;
+			bool save_ImmediateInterruptOK;
+
+			/*
+			 * Disable everything that might recursively interrupt us.
+			 *
+			 * If there were any possibility that disabling and re-enabling
+			 * interrupts or handling parallel messages might take a lock, we'd
+			 * need to HOLD_INTERRUPTS() as well, since taking a lock might
+			 * cause ImmediateInterruptOK to get temporarily reset to true.
+			 * But that shouldn't happen, so this is (hopefully) safe.  That's
+			 * good, because it lets us respond to query cancel and die
+			 * interrupts while we're in the midst of message-processing.
+			 */
+			save_ImmediateInterruptOK = ImmediateInterruptOK;
+			ImmediateInterruptOK = false;
+			notify_enabled = DisableNotifyInterrupt();
+			catchup_enabled = DisableCatchupInterrupt();
+
+			/* OK, do the work... */
+			HandleParallelMessages();
+
+			/* Now re-enable whatever was enabled before */
+			if (catchup_enabled)
+				EnableCatchupInterrupt();
+			if (notify_enabled)
+				EnableNotifyInterrupt();
+			ImmediateInterruptOK = save_ImmediateInterruptOK;
+		}
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Handle any queued protocol messages received from parallel workers.
+ */
+static void
+HandleParallelMessages(void)
+{
+	dlist_iter	iter;
+
+	ParallelMessagePending = false;
+
+	dlist_foreach(iter, &pcxt_list)
+	{
+		ParallelContext *pcxt;
+		int		i;
+		Size	nbytes;
+		void   *data;
+
+		pcxt = dlist_container(ParallelContext, node, iter.cur);
+		if (pcxt->worker == NULL)
+			continue;
+
+		for (i = 0; i < pcxt->nworkers; ++i)
+		{
+			if (pcxt->worker[i].error_mqh == NULL)
+				continue;
+
+			for (;;)
+			{
+				shm_mq_result	res;
+
+				CHECK_FOR_INTERRUPTS();
+
+				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
+									 &data, true);
+				if (res == SHM_MQ_SUCCESS)
+				{
+					StringInfoData	msg;
+
+					initStringInfo(&msg);
+					appendBinaryStringInfo(&msg, data, nbytes);
+					HandleParallelMessage(&msg);
+					pfree(msg.data);
+				}
+				if (res == SHM_MQ_DETACHED)
+					ereport(ERROR,
+							(errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
+							 errmsg("lost connection to parallel worker")));
+			}
+		}
+	}
+}
+
+/*
+ * Handle a single protocol message received from a single parallel worker.
+ */
+static void
+HandleParallelMessage(StringInfo msg)
+{
+	char	msgtype;
+
+	msgtype = pq_getmsgbyte(msg);
+
+	switch (msgtype)
+	{
+		case 'E':
+		case 'N':
+			{
+				ErrorData	edata;
+
+				/* Parse ErrorReponse or NoticeResponse. */
+				pq_parse_errornotice(msg, &edata);
+
+				/* Death of a worker is insufficient justification for suicide. */
+				edata.elevel = Min(edata.elevel, ERROR);
+
+				/*
+				 * XXX.  We should probably use the error context callbacks in
+				 * effect at the time the parallel context was created.
+				 */
+
+				ThrowErrorData(&edata);
+
+				break;
+			}
+
+		case 'A':
+			{
+				/* Propagate NotifyResponse. */
+				pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1);
+				break;
+			}
+
+		default:
+			{
+				elog(ERROR, "unknown message type: %c (%d bytes)",
+					 msgtype, msg->len);
+			}
+	}
+}
+
+/*
+ * End-of-subtransaction cleanup for parallel contexts.
+ *
+ * Currently, it's forbidden to enter or leave a subtransaction while
+ * parallel mode is in effect, so we could just blow away everything.  But
+ * we may want to relax that restriction in the future, so this code
+ * contemplates that there may be multiple subtransaction IDs in pcxt_list.
+ */
+void
+AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
+{
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (pcxt->subid != mySubId)
+			break;
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * End-of-transaction cleanup for parallel contexts.
+ */
+void
+AtEOXact_Parallel(bool isCommit)
+{
+	while (!dlist_is_empty(&pcxt_list))
+	{
+		ParallelContext *pcxt;
+
+		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
+		if (isCommit)
+			elog(WARNING, "leaked parallel context");
+		DestroyParallelContext(pcxt);
+	}
+}
+
+/*
+ * Main entrypoint for parallel workers.
+ */
+static void
+ParallelMain(Datum main_arg)
+{
+	dsm_segment *seg;
+	shm_toc *toc;
+	FixedParallelState *fps;
+	char   *error_queue_space;
+	shm_mq *mq;
+	shm_mq_handle *mqh;
+	char   *gucspace;
+	char   *tsnapspace;
+	char   *asnapspace;
+	char   *combocidspace;
+
+	/* Establish signal handlers. */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up a memory context and resource owner. */
+	Assert(CurrentResourceOwner == NULL);
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
+	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
+												 "parallel worker",
+												 ALLOCSET_DEFAULT_MINSIZE,
+												 ALLOCSET_DEFAULT_INITSIZE,
+												 ALLOCSET_DEFAULT_MAXSIZE);
+
+	/*
+	 * Now that we have a resource owner, we can attach to the dynamic
+	 * shared memory segment and read the table of contents.
+	 */
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/* Determine and set our worker number. */
+	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
+	Assert(fps != NULL);
+	Assert(ParallelWorkerNumber == -1);
+	SpinLockAcquire(&fps->mutex);
+	if (fps->workers_attached < fps->workers_expected)
+		ParallelWorkerNumber = fps->workers_attached++;
+	SpinLockRelease(&fps->mutex);
+	if (ParallelWorkerNumber < 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("too many parallel workers already attached")));
+
+	/*
+	 * Now that we have a worker number, we can find and attach to the error
+	 * queue provided for us.  That's good, because until we do that, any
+	 * errors that happen here will not be reported back to the process that
+	 * requested that this worker be launched.
+	 */
+	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
+	mq = (shm_mq *) (error_queue_space +
+		ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
+	shm_mq_set_sender(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+	pq_redirect_to_shm_mq(mq, mqh);
+	pq_set_parallel_master(fps->parallel_master_pid,
+						   fps->parallel_master_backend_id);
+
+	/*
+	 * Hooray! Primary initialization is complete.  Now, we need to set up
+	 * our backend-local state to match the original backend.
+	 */
+
+	/* Restore database connection. */
+	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
+											  fps->authenticated_user_id);
+
+	/* Restore GUC values from launching backend. */
+	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
+	Assert(gucspace != NULL);
+	StartTransactionCommand();
+	RestoreGUCState(gucspace);
+	CommitTransactionCommand();
+
+	/* Handle local_preload_libraries and session_preload_libraries. */
+	process_session_preload_libraries();
+
+	/*
+	 * XXX.  At this point, we should restore the transaction state as it
+	 * exists in the master.  But since we don't have code for that yet, just
+	 * start a new transaction.
+	 */
+	StartTransactionCommand();
+
+	/* Restore combo CID state. */
+	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
+	Assert(combocidspace != NULL);
+	RestoreComboCIDState(combocidspace);
+
+	/* Restore transaction snapshot. */
+	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
+	Assert(tsnapspace != NULL);
+	RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
+							   fps->parallel_master_pgproc);
+
+	/* Restore active snapshot. */
+	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
+	Assert(asnapspace != NULL);
+	PushActiveSnapshot(RestoreSnapshot(asnapspace));
+
+	/* Restore user ID and security context. */
+	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
+
+	/*
+	 * We've initialized all of our state now; nothing should change hereafter.
+	 */
+	EnterParallelMode();
+
+	/*
+	 * Time to do the real work: invoke the caller-supplied code.
+	 *
+	 * Note that it's unsafe for this entrypoint to be a function living in a
+	 * dynamically loaded module, because it might not be loaded at the same
+	 * address in every process.  Use CreateParallelContextForExtension()
+	 * rather than CreateParallelContext() to handle that case.
+	 */
+	fps->entrypoint(toc);
+
+	/*
+	 * XXX. There's probably some end-of-parallel-phase cleanup that needs to
+	 * happen here; we shouldn't just exit in the midde of a transaction.
+	 * But I don't know what's needed yet.
+	 */
+}
+
+/*
+ * In-core trampoline to invoke extension entrypints.
+ *
+ * See comments in ParallelMain() and StartBackgroundWorker() for why we
+ * need this.
+ */
+static void
+ParallelExtensionTrampoline(shm_toc *toc)
+{
+	char   *extensionstate;
+	char   *library_name;
+	char   *function_name;
+	parallel_worker_main_type entrypt;
+
+	extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE);
+	Assert(extensionstate != NULL);
+	library_name = extensionstate;
+	function_name = extensionstate + strlen(library_name) + 1;
+
+	entrypt = (parallel_worker_main_type)
+		load_external_function(library_name, function_name, true, NULL);
+	entrypt(toc);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int		save_errno = errno;
+
+	if (MyProc)
+		SetLatch(&MyProc->procLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index c541156..92f657e 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact)
 	TransactionId xid;
 
 	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new XIDs after that point.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot assign TransactionIds during a parallel operation");
+
+	/*
 	 * During bootstrap initialization, we return the special bootstrap
 	 * transaction id.
 	 */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8b2f714..2fa2c11 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -22,6 +22,7 @@
 
 #include "access/commit_ts.h"
 #include "access/multixact.h"
+#include "access/parallel.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -152,6 +153,7 @@ typedef struct TransactionStateData
 	bool		prevXactReadOnly;		/* entry-time xact r/o state */
 	bool		startedInRecovery;		/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
+	bool		parallelMode;	/* current transaction in parallel operation? */
 	struct TransactionStateData *parent;		/* back link to parent */
 } TransactionStateData;
 
@@ -182,6 +184,7 @@ static TransactionStateData TopTransactionStateData = {
 	false,						/* entry-time xact r/o state */
 	false,						/* startedInRecovery */
 	false,						/* didLogXid */
+	false,						/* parallelMode */
 	NULL						/* link to parent state block */
 };
 
@@ -642,7 +645,16 @@ GetCurrentCommandId(bool used)
 {
 	/* this is global to a transaction, not subtransaction-local */
 	if (used)
+	{
+		/*
+		 * Forbid setting currentCommandIdUsed in parallel mode, because we
+		 * have no provision for communicating this back to the master.  We
+		 * could relax this restriction when currentCommandIdUsed was already
+		 * true at the start of the parallel operation.
+		 */
+		Assert(!CurrentTransactionState->parallelMode);
 		currentCommandIdUsed = true;
+	}
 	return currentCommandId;
 }
 
@@ -789,6 +801,53 @@ TransactionStartedDuringRecovery(void)
 }
 
 /*
+ *	EnterParallelMode
+ */
+void
+EnterParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	/*
+	 * Workers synchronize transaction state at the beginning of each
+	 * parallel operation, so we can't let the transaction state be changed
+	 * after that point.  That includes the parallel mode flag itself.
+	 */
+	Assert(!s->parallelMode);
+
+	s->parallelMode = true;
+}
+
+/*
+ *	ExitParallelMode
+ */
+void
+ExitParallelMode(void)
+{
+	TransactionState s = CurrentTransactionState;
+
+	Assert(s->parallelMode);
+	Assert(!ParallelContextActive());
+
+	s->parallelMode = false;
+}
+
+/*
+ *	IsInParallelMode
+ *
+ * Are we in a parallel operation, as either the master or a worker?  Check
+ * this to prohibit operations that change backend-local state expected to
+ * match across all workers.  Mere caches usually don't require such a
+ * restriction.  State modified in a strict push/pop fashion, such as the
+ * active snapshot stack, is often fine.
+ */
+bool
+IsInParallelMode(void)
+{
+	return CurrentTransactionState->parallelMode;
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
@@ -802,6 +861,14 @@ CommandCounterIncrement(void)
 	 */
 	if (currentCommandIdUsed)
 	{
+		/*
+		 * Workers synchronize transaction state at the beginning of each
+		 * parallel operation, so we can't account for new commands after that
+		 * point.
+		 */
+		if (IsInParallelMode())
+			elog(ERROR, "cannot start commands during a parallel operation");
+
 		currentCommandId += 1;
 		if (currentCommandId == InvalidCommandId)
 		{
@@ -1705,6 +1772,8 @@ StartTransaction(void)
 	s = &TopTransactionStateData;
 	CurrentTransactionState = s;
 
+	Assert(!IsInParallelMode());
+
 	/*
 	 * check the current transaction state
 	 */
@@ -1835,6 +1904,8 @@ CommitTransaction(void)
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
 
+	Assert(!IsInParallelMode());
+
 	ShowTransactionState("CommitTransaction");
 
 	/*
@@ -1921,6 +1992,13 @@ CommitTransaction(void)
 
 	TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOXact_Parallel(true);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * Let others know about no transaction in progress by me. Note that this
 	 * must be done _before_ releasing locks we hold and _after_
@@ -2040,6 +2118,8 @@ PrepareTransaction(void)
 	GlobalTransaction gxact;
 	TimestampTz prepared_at;
 
+	Assert(!IsInParallelMode());
+
 	ShowTransactionState("PrepareTransaction");
 
 	/*
@@ -2391,6 +2471,13 @@ AbortTransaction(void)
 
 	TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOXact_Parallel(false);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * Let others know about no transaction in progress by me. Note that this
 	 * must be done _before_ releasing locks we hold and _after_
@@ -3540,6 +3627,16 @@ DefineSavepoint(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot define savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_INPROGRESS:
@@ -3594,6 +3691,16 @@ ReleaseSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot release savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3694,6 +3801,16 @@ RollbackToSavepoint(List *options)
 	ListCell   *cell;
 	char	   *name = NULL;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for transaction state change after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot rollback to savepoints during a parallel operation")));
+
 	switch (s->blockState)
 	{
 			/*
@@ -3806,6 +3923,16 @@ BeginInternalSubTransaction(char *name)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for new subtransactions after that
+	 * point.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot start subtransactions during a parallel operation")));
+
 	switch (s->blockState)
 	{
 		case TBLOCK_STARTED:
@@ -3860,6 +3987,18 @@ ReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Workers synchronize transaction state at the beginning of each parallel
+	 * operation, so we can't account for commit of subtransactions after that
+	 * point.  This should not happen anyway.  Code calling this would
+	 * typically have called BeginInternalSubTransaction() first, failing
+	 * there.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot commit subtransactions during a parallel operation")));
+
 	if (s->blockState != TBLOCK_SUBINPROGRESS)
 		elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s",
 			 BlockStateAsString(s->blockState));
@@ -3882,6 +4021,14 @@ RollbackAndReleaseCurrentSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
 
+	/*
+	 * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted
+	 * during parallel operations.  That's because we may be in the master,
+	 * recovering from an error thrown while we were in parallel mode.  We
+	 * won't reach here in a worker, because BeginInternalSubTransaction()
+	 * will have failed.
+	 */
+
 	switch (s->blockState)
 	{
 			/* Must be in a subtransaction */
@@ -4157,6 +4304,13 @@ CommitSubTransaction(void)
 		elog(WARNING, "CommitSubTransaction while in %s state",
 			 TransStateAsString(s->state));
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(true, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/* Pre-commit processing goes here */
 
 	CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId,
@@ -4315,6 +4469,13 @@ AbortSubTransaction(void)
 	 */
 	SetUserIdAndSecContext(s->prevUser, s->prevSecContext);
 
+	/* Exit from parallel mode, if necessary. */
+	if (IsInParallelMode())
+	{
+		AtEOSubXact_Parallel(false, s->subTransactionId);
+		s->parallelMode = false;
+	}
+
 	/*
 	 * We can skip all this stuff if the subxact failed before creating a
 	 * ResourceOwner...
@@ -4455,6 +4616,7 @@ PushTransaction(void)
 	s->blockState = TBLOCK_SUBBEGIN;
 	GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
 	s->prevXactReadOnly = XactReadOnly;
+	s->parallelMode = false;
 
 	CurrentTransactionState = s;
 
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 4a542e6..04ecbbb 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -476,7 +476,7 @@ BootstrapModeMain(void)
 	 */
 	InitProcess();
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	/* Initialize stuff for bootstrap-file processing */
 	for (i = 0; i < MAXATTR; i++)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 08abe14..4790df5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -914,9 +914,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
 	{
 		Assert(rel);
 
-		/* check read-only transaction */
+		/* check read-only transaction and parallel mode */
 		if (XactReadOnly && !rel->rd_islocaltemp)
 			PreventCommandIfReadOnly("COPY FROM");
+		PreventCommandIfParallelMode("COPY FROM");
 
 		cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
 							   stmt->attlist, stmt->options);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index ba5b938..0b422dc 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -551,6 +551,13 @@ nextval_internal(Oid relid)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("nextval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("nextval()");
+
 	if (elm->last != elm->cached)		/* some numbers were cached */
 	{
 		Assert(elm->last_valid);
@@ -838,6 +845,13 @@ do_setval(Oid relid, int64 next, bool iscalled)
 	if (!seqrel->rd_islocaltemp)
 		PreventCommandIfReadOnly("setval()");
 
+	/*
+	 * Forbid this during parallel operation because, to make it work,
+	 * the cooperating backends would need to share the backend-local cached
+	 * sequence information.  Currently, we don't support that.
+	 */
+	PreventCommandIfParallelMode("setval()");
+
 	/* lock page' buffer and read tuple */
 	seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 8c799d3..9223f5e 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -135,8 +135,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	/*
 	 * If the transaction is read-only, we need to check if any writes are
 	 * planned to non-temporary tables.  EXPLAIN is considered read-only.
+	 *
+	 * Don't allow writes in parallel mode.  Supporting UPDATE and DELETE would
+	 * require (a) storing the combocid hash in shared memory, rather than
+	 * synchronizing it just once at the start of parallelism, and (b) an
+	 * alternative to heap_update()'s reliance on xmax for mutual exclusion.
+	 * INSERT may have no such troubles, but we forbid it to simplify the
+	 * checks.
+	 *
+	 * We have lower-level defenses in CommandCounterIncrement and elsewhere
+	 * against performing unsafe operations in parallel mode, but this gives
+	 * a more user-friendly error message.
 	 */
-	if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY))
+	if ((XactReadOnly || IsInParallelMode()) &&
+		!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
 		ExecCheckXactReadOnly(queryDesc->plannedstmt);
 
 	/*
@@ -679,18 +691,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte)
 }
 
 /*
- * Check that the query does not imply any writes to non-temp tables.
+ * Check that the query does not imply any writes to non-temp tables;
+ * unless we're in parallel mode, in which case don't even allow writes
+ * to temp tables.
  *
  * Note: in a Hot Standby slave this would need to reject writes to temp
- * tables as well; but an HS slave can't have created any temp tables
- * in the first place, so no need to check that.
+ * tables just as we do in parallel mode; but an HS slave can't have created
+ * any temp tables in the first place, so no need to check that.
  */
 static void
 ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 {
 	ListCell   *l;
 
-	/* Fail if write permissions are requested on any non-temp table */
+	/*
+	 * Fail if write permissions are requested in parallel mode for
+	 * table (temp or non-temp), otherwise fail for any non-temp table.
+	 */
 	foreach(l, plannedstmt->rtable)
 	{
 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
@@ -701,6 +718,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt)
 		if ((rte->requiredPerms & (~ACL_SELECT)) == 0)
 			continue;
 
+		PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt));
+
 		if (isTempNamespace(get_rel_namespace(rte->relid)))
 			continue;
 
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 4d11260..62b615a 100644
--- a/src/backend/executor/functions.c
+++ b/src/backend/executor/functions.c
@@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/* OK, build the execution_state for this query */
 			newes = (execution_state *) palloc(sizeof(execution_state));
 			if (preves)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index cfa4a24..72d55c7 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -23,6 +23,7 @@
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/spi_priv.h"
+#include "miscadmin.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 	}
 
 	/*
-	 * If told to be read-only, we'd better check for read-only queries. This
-	 * can't be done earlier because we need to look at the finished, planned
-	 * queries.  (In particular, we don't want to do it between GetCachedPlan
-	 * and PortalDefineQuery, because throwing an error between those steps
-	 * would result in leaking our plancache refcount.)
+	 * If told to be read-only, or in parallel mode, verify that this query
+	 * is in fact read-only.  This can't be done earlier because we need to
+	 * look at the finished, planned queries.  (In particular, we don't want
+	 * to do it between GetCachedPlan and PortalDefineQuery, because throwing
+	 * an error between those steps would result in leaking our plancache
+	 * refcount.)
 	 */
-	if (read_only)
+	if (read_only || IsInParallelMode())
 	{
 		ListCell   *lc;
 
@@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 			Node	   *pstmt = (Node *) lfirst(lc);
 
 			if (!CommandIsReadOnly(pstmt))
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				/* translator: %s is a SQL statement name */
-					   errmsg("%s is not allowed in a non-volatile function",
-							  CreateCommandTag(pstmt))));
+			{
+				if (read_only)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					/* translator: %s is a SQL statement name */
+							 errmsg("%s is not allowed in a non-volatile function",
+									CreateCommandTag(pstmt))));
+				else
+					PreventCommandIfParallelMode(CreateCommandTag(pstmt));
+			}
 		}
 	}
 
@@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 					   errmsg("%s is not allowed in a non-volatile function",
 							  CreateCommandTag(stmt))));
 
+			if (IsInParallelMode() && !CommandIsReadOnly(stmt))
+				PreventCommandIfParallelMode(CreateCommandTag(stmt));
+
 			/*
 			 * If not read-only mode, advance the command counter before each
 			 * command and update the snapshot.
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 6e6b429..b07978c 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -16,12 +16,15 @@
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
+#include "miscadmin.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 
 static shm_mq *pq_mq;
 static shm_mq_handle *pq_mq_handle;
 static bool pq_mq_busy = false;
+static pid_t pq_mq_parallel_master_pid = 0;
+static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId;
 
 static void mq_comm_reset(void);
 static int	mq_flush(void);
@@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh)
 	FrontendProtocol = PG_PROTOCOL_LATEST;
 }
 
+/*
+ * Arrange to SendProcSignal() to the parallel master each time we transmit
+ * message data via the shm_mq.
+ */
+void
+pq_set_parallel_master(pid_t pid, BackendId backend_id)
+{
+	Assert(PqCommMethods == &PqCommMqMethods);
+	pq_mq_parallel_master_pid = pid;
+	pq_mq_parallel_master_backend_id = backend_id;
+}
+
 static void
 mq_comm_reset(void)
 {
@@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 	iov[1].len = len;
 
 	Assert(pq_mq_handle != NULL);
-	result = shm_mq_sendv(pq_mq_handle, iov, 2, false);
+
+	for (;;)
+	{
+		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+
+		if (pq_mq_parallel_master_pid != 0)
+			SendProcSignal(pq_mq_parallel_master_pid,
+						   PROCSIG_PARALLEL_MESSAGE,
+						   pq_mq_parallel_master_backend_id);
+
+		if (result != SHM_MQ_WOULD_BLOCK)
+			break;
+
+		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0);
+		CHECK_FOR_INTERRUPTS();
+		ResetLatch(&MyProc->procLatch);
+	}
 
 	pq_mq_busy = false;
 
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index 1d6e3f3..8fb39d3 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -471,7 +471,7 @@ AutoVacLauncherMain(int argc, char *argv[])
 	InitProcess();
 #endif
 
-	InitPostgres(NULL, InvalidOid, NULL, NULL);
+	InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL);
 
 	SetProcessingMode(NormalProcessing);
 
@@ -1665,7 +1665,7 @@ AutoVacWorkerMain(int argc, char *argv[])
 		 * Note: if we have selected a just-deleted database (due to using
 		 * stale stats info), we'll fail and exit here.
 		 */
-		InitPostgres(NULL, dbid, NULL, dbname);
+		InitPostgres(NULL, dbid, NULL, InvalidOid, dbname);
 		SetProcessingMode(NormalProcessing);
 		set_ps_display(dbname, false);
 		ereport(DEBUG1,
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 5106f52..451f814 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -5305,7 +5305,30 @@ BackgroundWorkerInitializeConnection(char *dbname, char *username)
 				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 				 errmsg("database connection requirement not indicated during registration")));
 
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
+
+	/* it had better not gotten out of "init" mode yet */
+	if (!IsInitProcessingMode())
+		ereport(ERROR,
+				(errmsg("invalid processing mode in background worker")));
+	SetProcessingMode(NormalProcessing);
+}
+
+/*
+ * Connect background worker to a database using OIDs.
+ */
+void
+BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid)
+{
+	BackgroundWorker *worker = MyBgworkerEntry;
+
+	/* XXX is this the right errcode? */
+	if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION))
+		ereport(FATAL,
+				(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+				 errmsg("database connection requirement not indicated during registration")));
+
+	InitPostgres(NULL, dboid, NULL, useroid, NULL);
 
 	/* it had better not gotten out of "init" mode yet */
 	if (!IsInitProcessingMode())
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index d953545..7b7b57a 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1686,6 +1686,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid)
 }
 
 /*
+ * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin
+ *
+ * This is like ProcArrayInstallImportedXmin, but we have a pointer to the
+ * PGPROC of the transaction from which we imported the snapshot, rather than
+ * an XID.
+ *
+ * Returns TRUE if successful, FALSE if source xact is no longer running.
+ */
+bool
+ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
+{
+	bool		result = false;
+	TransactionId xid;
+	volatile PGXACT *pgxact;
+
+	Assert(TransactionIdIsNormal(xmin));
+	Assert(proc != NULL);
+
+	/* Get lock so source xact can't end while we're doing this */
+	LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+	pgxact = &allPgXact[proc->pgprocno];
+
+	/*
+	 * Be certain that the referenced PGPROC has an advertised xmin which
+	 * is no later than the one we're installing, so that the system-wide
+	 * xmin can't go backwards.  Also, make sure it's running in the same
+	 * database, so that the per-database xmin cannot go backwards.
+	 */
+	xid = pgxact->xmin;		/* fetch just once */
+	if (proc->databaseId == MyDatabaseId &&
+		TransactionIdIsNormal(xid) &&
+		TransactionIdPrecedesOrEquals(xid, xmin))
+	{
+		MyPgXact->xmin = TransactionXmin = xmin;
+		result = true;
+	}
+
+	LWLockRelease(ProcArrayLock);
+
+	return result;
+}
+
+/*
  * GetRunningTransactionData -- returns information about running transactions.
  *
  * Similar to GetSnapshotData but returns more information. We include
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index cd9a287..0678678 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -17,6 +17,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/parallel.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "storage/latch.h"
@@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT))
 		HandleNotifyInterrupt();
 
+	if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
+		HandleParallelMessageInterrupt(true);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c
index f126118..dd53426 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -1656,6 +1656,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 
 	Assert(!RecoveryInProgress());
 
+	/*
+	 * Since all parts of a serializable transaction must use the same
+	 * snapshot, it is too late to establish one after a parallel operation
+	 * has begun.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot establish serializable snapshot during a parallel operation");
+
 	proc = MyProc;
 	Assert(proc != NULL);
 	GET_VXID_FROM_PGPROC(vxid, *proc);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index cc62b2c..4285748 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -37,6 +37,7 @@
 #include "rusagestub.h"
 #endif
 
+#include "access/parallel.h"
 #include "access/printtup.h"
 #include "access/xact.h"
 #include "catalog/pg_type.h"
@@ -2968,7 +2969,8 @@ ProcessInterrupts(void)
 					 errmsg("canceling statement due to user request")));
 		}
 	}
-	/* If we get here, do nothing (probably, QueryCancelPending was reset) */
+	if (ParallelMessagePending)
+		HandleParallelMessageInterrupt(false);
 }
 
 
@@ -3704,7 +3706,7 @@ PostgresMain(int argc, char *argv[],
 	 * it inside InitPostgres() instead.  In particular, anything that
 	 * involves database access should be there, not here.
 	 */
-	InitPostgres(dbname, InvalidOid, username, NULL);
+	InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL);
 
 	/*
 	 * If the PostmasterContext is still around, recycle the space; we don't
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index aa8fe88..c4862a9 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree)
 static void
 check_xact_readonly(Node *parsetree)
 {
-	if (!XactReadOnly)
+	/* Only perform the check if we have a reason to do so. */
+	if (!XactReadOnly && !IsInParallelMode())
 		return;
 
 	/*
 	 * Note: Commands that need to do more complicated checking are handled
 	 * elsewhere, in particular COPY and plannable statements do their own
-	 * checking.  However they should all call PreventCommandIfReadOnly to
-	 * actually throw the error.
+	 * checking.  However they should all call PreventCommandIfReadOnly
+	 * or PreventCommandIfParallelMode to actually throw the error.
 	 */
 
 	switch (nodeTag(parsetree))
@@ -207,6 +208,7 @@ check_xact_readonly(Node *parsetree)
 		case T_ImportForeignSchemaStmt:
 		case T_SecLabelStmt:
 			PreventCommandIfReadOnly(CreateCommandTag(parsetree));
+			PreventCommandIfParallelMode(CreateCommandTag(parsetree));
 			break;
 		default:
 			/* do nothing */
@@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname)
 }
 
 /*
+ * PreventCommandIfParallelMode: throw error if current (sub)transaction is
+ * in parallel mode.
+ *
+ * This is useful mainly to ensure consistency of the error message wording;
+ * most callers have checked IsInParallelMode() for themselves.
+ */
+void
+PreventCommandIfParallelMode(const char *cmdname)
+{
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+		/* translator: %s is name of a SQL command, eg CREATE */
+				 errmsg("cannot execute %s during a parallel operation",
+						cmdname)));
+}
+
+/*
  * PreventCommandDuringRecovery: throw error if RecoveryInProgress
  *
  * The majority of operations that are unsafe in a Hot Standby slave
@@ -630,6 +650,7 @@ standard_ProcessUtility(Node *parsetree,
 		case T_ClusterStmt:
 			/* we choose to allow this during "read only" transactions */
 			PreventCommandDuringRecovery("CLUSTER");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			cluster((ClusterStmt *) parsetree, isTopLevel);
 			break;
 
@@ -640,6 +661,7 @@ standard_ProcessUtility(Node *parsetree,
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ?
 											 "VACUUM" : "ANALYZE");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				vacuum(stmt, InvalidOid, true, NULL, false, isTopLevel);
 			}
 			break;
@@ -716,6 +738,7 @@ standard_ProcessUtility(Node *parsetree,
 			 * outside a transaction block is presumed to be user error.
 			 */
 			RequireTransactionChain(isTopLevel, "LOCK TABLE");
+			/* forbidden in parallel mode due to CommandIsReadOnly */
 			LockTableCommand((LockStmt *) parsetree);
 			break;
 
@@ -747,6 +770,7 @@ standard_ProcessUtility(Node *parsetree,
 
 				/* we choose to allow this during "read only" transactions */
 				PreventCommandDuringRecovery("REINDEX");
+				/* forbidden in parallel mode due to CommandIsReadOnly */
 				switch (stmt->kind)
 				{
 					case REINDEX_OBJECT_INDEX:
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 8fccb4c..a045062 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -350,11 +350,10 @@ has_rolreplication(Oid roleid)
  * Initialize user identity during normal backend startup
  */
 void
-InitializeSessionUserId(const char *rolename)
+InitializeSessionUserId(const char *rolename, Oid roleid)
 {
 	HeapTuple	roleTup;
 	Form_pg_authid rform;
-	Oid			roleid;
 
 	/*
 	 * Don't do scans if we're bootstrapping, none of the system catalogs
@@ -365,7 +364,10 @@ InitializeSessionUserId(const char *rolename)
 	/* call only once */
 	AssertState(!OidIsValid(AuthenticatedUserId));
 
-	roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	if (rolename != NULL)
+		roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename));
+	else
+		roleTup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid));
 	if (!HeapTupleIsValid(roleTup))
 		ereport(FATAL,
 				(errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION),
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index c348034..8f6d517 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -523,6 +523,9 @@ BaseInit(void)
  * name can be returned to the caller in out_dbname.  If out_dbname isn't
  * NULL, it must point to a buffer of size NAMEDATALEN.
  *
+ * Similarly, the username can be passed by name, using the username parameter,
+ * or by OID using the useroid parameter.
+ *
  * In bootstrap mode no parameters are used.  The autovacuum launcher process
  * doesn't use any parameters either, because it only goes far enough to be
  * able to read pg_database; it doesn't connect to any particular database.
@@ -537,7 +540,7 @@ BaseInit(void)
  */
 void
 InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname)
+			 Oid useroid, char *out_dbname)
 {
 	bool		bootstrap = IsBootstrapProcessingMode();
 	bool		am_superuser;
@@ -692,18 +695,18 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("no roles are defined in this database system"),
 					 errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.",
-							 username)));
+							 username != NULL ? username : "postgres")));
 	}
 	else if (IsBackgroundWorker)
 	{
-		if (username == NULL)
+		if (username == NULL && !OidIsValid(useroid))
 		{
 			InitializeSessionUserIdStandalone();
 			am_superuser = true;
 		}
 		else
 		{
-			InitializeSessionUserId(username);
+			InitializeSessionUserId(username, useroid);
 			am_superuser = superuser();
 		}
 	}
@@ -712,7 +715,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 		/* normal multiuser case */
 		Assert(MyProcPort != NULL);
 		PerformAuthentication(MyProcPort);
-		InitializeSessionUserId(username);
+		InitializeSessionUserId(username, useroid);
 		am_superuser = superuser();
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b1bff7f..a30d809 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -5656,6 +5656,20 @@ set_config_option(const char *name, const char *value,
 			elevel = ERROR;
 	}
 
+	/*
+	 * GUC_ACTION_SAVE changes are acceptable during a parallel operation,
+	 * because the current worker will also pop the change.  We're probably
+	 * dealing with a function having a proconfig entry.  Only the function's
+	 * body should observe the change, and peer workers do not share in the
+	 * execution of a function call started by this worker.
+	 *
+	 * Other changes might need to affect other workers, so forbid them.
+	 */
+	if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE)
+		ereport(elevel,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	record = find_option(name, true, elevel);
 	if (record == NULL)
 	{
@@ -6929,6 +6943,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel)
 {
 	GucAction	action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET;
 
+	/*
+	 * Workers synchronize these parameters at the start of the parallel
+	 * operation; then, we block SET during the operation.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot set parameters during a parallel operation")));
+
 	switch (stmt->kind)
 	{
 		case VAR_SET_VALUE:
diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c
index a70c754..d2f23c0 100644
--- a/src/backend/utils/time/combocid.c
+++ b/src/backend/utils/time/combocid.c
@@ -44,6 +44,7 @@
 #include "miscadmin.h"
 #include "access/htup_details.h"
 #include "access/xact.h"
+#include "storage/shmem.h"
 #include "utils/combocid.h"
 #include "utils/hsearch.h"
 #include "utils/memutils.h"
@@ -287,3 +288,76 @@ GetRealCmax(CommandId combocid)
 	Assert(combocid < usedComboCids);
 	return comboCids[combocid].cmax;
 }
+
+/*
+ * Estimate the amount of space required to serialize the current ComboCID
+ * state.
+ */
+Size
+EstimateComboCIDStateSpace(void)
+{
+	Size		size;
+
+	/* Add space required for saving usedComboCids */
+	size = sizeof(int);
+
+	/* Add space required for saving the combocids key */
+	size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids));
+
+	return size;
+}
+
+/*
+ * Serialize the ComboCID state into the memory, beginning at start_address.
+ * maxsize should be at least as large as the value returned by
+ * EstimateComboCIDStateSpace.
+ */
+void
+SerializeComboCIDState(Size maxsize, char *start_address)
+{
+	char	   *endptr;
+
+	/* First, we store the number of currently-existing ComboCIDs. */
+	* (int *) start_address = usedComboCids;
+
+	/* If maxsize is too small, throw an error. */
+	endptr = start_address + sizeof(int) +
+		(sizeof(ComboCidKeyData) * usedComboCids);
+	if (endptr < start_address || endptr > start_address + maxsize)
+		elog(ERROR, "not enough space to serialize ComboCID state");
+
+	/* Now, copy the actual cmin/cmax pairs. */
+	memcpy(start_address + sizeof(int), comboCids,
+		   (sizeof(ComboCidKeyData) * usedComboCids));
+}
+
+/*
+ * Read the ComboCID state at the specified address and initialize this
+ * backend with the same ComboCIDs.  This is only valid in a backend that
+ * currently has no ComboCIDs (and only makes sense if the transaction state
+ * is serialized and restored as well).
+ */
+void
+RestoreComboCIDState(char *comboCIDstate)
+{
+	int			num_elements;
+	ComboCidKeyData *keydata;
+	int			i;
+	CommandId	cid;
+
+	Assert(!comboCids || !comboHash);
+
+	/* First, we retrieve the number of ComboCIDs that were serialized. */
+	num_elements = * (int *) comboCIDstate;
+	keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int));
+
+	/* Use GetComboCommandId to restore each ComboCID. */
+	for (i = 0; i < num_elements; i++)
+	{
+		cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax);
+
+		/* Verify that we got the expected answer. */
+		if (cid != i)
+			elog(ERROR, "unexpected command ID while restoring combo CIDs");
+	}
+}
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index d601efe..b823827 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -155,6 +155,22 @@ static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
 
+/*
+ * Snapshot fields to be serialized.
+ *
+ * Only these fields need to be sent to the cooperating backend; the
+ * remaining ones can (and must) set by the receiver upon restore.
+ */
+typedef struct SerializedSnapshotData
+{
+	TransactionId xmin;
+	TransactionId xmax;
+	uint32		xcnt;
+	int32		subxcnt;
+	bool		suboverflowed;
+	bool		takenDuringRecovery;
+	CommandId	curcid;
+} SerializedSnapshotData;
 
 /*
  * GetTransactionSnapshot
@@ -186,6 +202,10 @@ GetTransactionSnapshot(void)
 		Assert(RegisteredSnapshots == 0);
 		Assert(FirstXactSnapshot == NULL);
 
+		if (IsInParallelMode())
+			elog(ERROR,
+				 "cannot take query snapshot during a parallel operation");
+
 		/*
 		 * In transaction-snapshot mode, the first snapshot must live until
 		 * end of xact regardless of what the caller does with it, so we must
@@ -237,6 +257,13 @@ Snapshot
 GetLatestSnapshot(void)
 {
 	/*
+	 * We might be able to relax this, but nothing that could otherwise work
+	 * needs it.
+	 */
+	if (IsInParallelMode())
+		elog(ERROR, "cannot update SecondarySnapshpt during a parallel operation");
+
+	/*
 	 * So far there are no cases requiring support for GetLatestSnapshot()
 	 * during logical decoding, but it wouldn't be hard to add if required.
 	 */
@@ -345,7 +372,8 @@ SnapshotSetCommandId(CommandId curcid)
  * in GetTransactionSnapshot.
  */
 static void
-SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
+SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid,
+					   PGPROC *sourceproc)
 {
 	/* Caller should have checked this already */
 	Assert(!FirstSnapshotSet);
@@ -392,7 +420,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid)
 	 * doesn't seem worth contorting the logic here to avoid two calls,
 	 * especially since it's not clear that predicate.c *must* do this.
 	 */
-	if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
+	if (sourceproc != NULL)
+	{
+		if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc))
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not import the requested snapshot"),
+			   errdetail("The source transaction is not running anymore.")));
+	}
+	else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid))
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("could not import the requested snapshot"),
@@ -548,11 +584,24 @@ PushCopiedSnapshot(Snapshot snapshot)
 void
 UpdateActiveSnapshotCommandId(void)
 {
+	CommandId	save_curcid, curcid;
 	Assert(ActiveSnapshot != NULL);
 	Assert(ActiveSnapshot->as_snap->active_count == 1);
 	Assert(ActiveSnapshot->as_snap->regd_count == 0);
 
-	ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
+	/*
+	 * Don't allow modification of the active snapshot during parallel
+	 * operation.  We share the snapshot to worker backends at beginning of
+	 * parallel operation, so any change to snapshot can lead to
+	 * inconsistencies.  We have other defenses against
+	 * CommandCounterIncrement, but there are a few places that call this
+	 * directly, so we put an additional guard here.
+	 */
+	save_curcid = ActiveSnapshot->as_snap->curcid;
+	curcid = GetCurrentCommandId(false);
+	if (IsInParallelMode() && save_curcid != curcid)
+		elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation");
+	ActiveSnapshot->as_snap->curcid = curcid;
 }
 
 /*
@@ -1247,7 +1296,7 @@ ImportSnapshot(const char *idstr)
 			  errmsg("cannot import a snapshot from a different database")));
 
 	/* OK, install the snapshot */
-	SetTransactionSnapshot(&snapshot, src_xid);
+	SetTransactionSnapshot(&snapshot, src_xid, NULL);
 }
 
 /*
@@ -1350,3 +1399,152 @@ HistoricSnapshotGetTupleCids(void)
 	Assert(HistoricSnapshotActive());
 	return tuplecid_data;
 }
+
+/*
+ * EstimateSnapshotSpace
+ *		Returns the size need to store the given snapshot.
+ *
+ * We are exporting only required fields from the Snapshot, stored in
+ * SerializedSnapshotData.
+ */
+Size
+EstimateSnapshotSpace(Snapshot snap)
+{
+	Size		size;
+
+	Assert(snap != InvalidSnapshot);
+	Assert(snap->satisfies == HeapTupleSatisfiesMVCC);
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = add_size(sizeof(SerializedSnapshotData),
+					mul_size(snap->xcnt, sizeof(TransactionId)));
+	if (snap->subxcnt > 0 &&
+		(!snap->suboverflowed || snap->takenDuringRecovery))
+		size = add_size(size,
+						mul_size(snap->subxcnt, sizeof(TransactionId)));
+
+	return size;
+}
+
+/*
+ * SerializeSnapshot
+ * 		Dumps the serialized snapshot (extracted from given snapshot) onto the
+ * 		memory location at start_address.
+ */
+void
+SerializeSnapshot(Snapshot snapshot, Size maxsize, char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+
+	/* If the size is small, throw an error */
+	if (maxsize < EstimateSnapshotSpace(snapshot))
+		elog(ERROR, "not enough space to serialize given snapshot");
+
+	Assert(snapshot->xcnt >= 0);
+	Assert(snapshot->subxcnt >= 0);
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+	/* Copy all required fields */
+	serialized_snapshot->xmin = snapshot->xmin;
+	serialized_snapshot->xmax = snapshot->xmax;
+	serialized_snapshot->xcnt = snapshot->xcnt;
+	serialized_snapshot->subxcnt = snapshot->subxcnt;
+	serialized_snapshot->suboverflowed = snapshot->suboverflowed;
+	serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery;
+	serialized_snapshot->curcid = snapshot->curcid;
+
+	/*
+	 * Ignore the SubXID array if it has overflowed, unless the snapshot
+	 * was taken during recovey - in that case, top-level XIDs are in subxip
+	 * as well, and we mustn't lose them.
+	 */
+	if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery)
+		serialized_snapshot->subxcnt = 0;
+
+	/* Copy XID array */
+	if (snapshot->xcnt > 0)
+		memcpy((TransactionId *) (serialized_snapshot + 1),
+			   snapshot->xip, snapshot->xcnt * sizeof(TransactionId));
+
+	/*
+	 * Copy SubXID array. Don't bother to copy it if it had overflowed,
+	 * though, because it's not used anywhere in that case. Except if it's a
+	 * snapshot taken during recovery; all the top-level XIDs are in subxip as
+	 * well in that case, so we mustn't lose them.
+	 */
+	if (snapshot->subxcnt > 0)
+	{
+		Size subxipoff = sizeof(SerializedSnapshotData) +
+			snapshot->xcnt * sizeof(TransactionId);
+
+		memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff),
+			   snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId));
+	}
+}
+
+/*
+ * RestoreSnapshot
+ *		Restore a serialized snapshot from the specified address.
+ *
+ * The copy is palloc'd in TopTransactionContext and has initial refcounts set
+ * to 0.  The returned snapshot has the copied flag set.
+ *
+ * If set_transaction_snapshot is true, the snapshot is additionally installed
+ * as the transaction snapshot.
+ */
+Snapshot
+RestoreSnapshot(char *start_address)
+{
+	SerializedSnapshotData *serialized_snapshot;
+	Size		size;
+	Size		subxipoff;
+	Snapshot	snapshot;
+
+	serialized_snapshot = (SerializedSnapshotData *) start_address;
+
+	/* We allocate any XID arrays needed in the same palloc block. */
+	size = subxipoff = sizeof(SnapshotData) +
+		serialized_snapshot->xcnt * sizeof(TransactionId);
+	size += serialized_snapshot->subxcnt * sizeof(TransactionId);
+
+	/* Copy all required fields */
+	snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size);
+	snapshot->xmin = serialized_snapshot->xmin;
+	snapshot->xmax = serialized_snapshot->xmax;
+	snapshot->xcnt = serialized_snapshot->xcnt;
+	snapshot->subxcnt = serialized_snapshot->subxcnt;
+	snapshot->suboverflowed = serialized_snapshot->suboverflowed;
+	snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery;
+	snapshot->curcid = serialized_snapshot->curcid;
+
+	/* Copy XIDs, if present. */
+	if (serialized_snapshot->xcnt > 0)
+		memcpy(snapshot->xip, (TransactionId *) (serialized_snapshot + 1),
+			   serialized_snapshot->xcnt * sizeof(TransactionId));
+
+	/* Copy SubXIDs, if present. */
+	if (serialized_snapshot->subxcnt > 0)
+		memcpy(snapshot->subxip,
+			   (TransactionId *) ((char *) serialized_snapshot + subxipoff),
+			   serialized_snapshot->subxcnt * sizeof(TransactionId));
+
+	/* Set the copied flag so that the caller will set refcounts correctly. */
+	snapshot->regd_count = 0;
+	snapshot->active_count = 0;
+	snapshot->copied = true;
+
+	return snapshot;
+}
+
+/*
+ * Install a restored snapshot as the transaction snapshot.
+ *
+ * The second argument is of type void * so that snapmgr.h need not include
+ * the declaration for PGPROC.
+ */
+void
+RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
+{
+	SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc);
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
new file mode 100644
index 0000000..9fe2df8
--- /dev/null
+++ b/src/include/access/parallel.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * parallel.h
+ *	  Infrastructure for launching parallel workers
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/parallel.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PARALLEL_H
+#define PARALLEL_H
+
+#include "lib/ilist.h"
+#include "postmaster/bgworker.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+
+typedef void (*parallel_worker_main_type)(shm_toc *toc);
+
+typedef struct ParallelWorkerInfo
+{
+	BackgroundWorkerHandle *bgwhandle;
+	shm_mq_handle *error_mqh;
+} ParallelWorkerInfo;
+
+typedef struct ParallelContext
+{
+	dlist_node node;
+	SubTransactionId subid;
+	int nworkers;
+	parallel_worker_main_type entrypoint;
+	char *library_name;
+	char *function_name;
+	shm_toc_estimator estimator;
+	dsm_segment *seg;
+	shm_toc *toc;
+	ParallelWorkerInfo *worker;
+} ParallelContext;
+
+extern bool ParallelMessagePending;
+
+extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
+extern ParallelContext *CreateParallelContextForExtension(char *library_name,
+								  char *function_name, int nworkers);
+extern void InitializeParallelDSM(ParallelContext *);
+extern void LaunchParallelWorkers(ParallelContext *);
+extern void DestroyParallelContext(ParallelContext *);
+extern bool ParallelContextActive(void);
+
+extern void HandleParallelMessageInterrupt(bool signal_handler);
+extern void AtEOXact_Parallel(bool isCommit);
+extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
+
+#endif   /* PARALLEL_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b018aa4..6e8290f 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -260,4 +260,8 @@ extern void xact_redo(XLogReaderState *record);
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
 
+extern void EnterParallelMode(void);
+extern void ExitParallelMode(void);
+extern bool IsInParallelMode(void);
+
 #endif   /* XACT_H */
diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h
index f8e68c9..78583b3 100644
--- a/src/include/libpq/pqmq.h
+++ b/src/include/libpq/pqmq.h
@@ -17,6 +17,7 @@
 #include "storage/shm_mq.h"
 
 extern void	pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *);
+extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
 
 extern void pq_parse_errornotice(StringInfo str, ErrorData *edata);
 
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1558a75..fb7754f 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -258,6 +258,7 @@ extern void check_stack_depth(void);
 
 /* in tcop/utility.c */
 extern void PreventCommandIfReadOnly(const char *cmdname);
+extern void PreventCommandIfParallelMode(const char *cmdname);
 extern void PreventCommandDuringRecovery(const char *cmdname);
 
 /* in utils/misc/guc.c */
@@ -290,7 +291,7 @@ extern bool InLocalUserIdChange(void);
 extern bool InSecurityRestrictedOperation(void);
 extern void GetUserIdAndContext(Oid *userid, bool *sec_def_context);
 extern void SetUserIdAndContext(Oid userid, bool sec_def_context);
-extern void InitializeSessionUserId(const char *rolename);
+extern void InitializeSessionUserId(const char *rolename, Oid useroid);
 extern void InitializeSessionUserIdStandalone(void);
 extern void SetSessionAuthorization(Oid userid, bool is_superuser);
 extern Oid	GetCurrentRoleId(void);
@@ -391,7 +392,7 @@ extern AuxProcType MyAuxProcType;
 extern void pg_split_opts(char **argv, int *argcp, char *optstr);
 extern void InitializeMaxBackends(void);
 extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username,
-			 char *out_dbname);
+			 Oid useroid, char *out_dbname);
 extern void BaseInit(void);
 
 /* in utils/init/miscinit.c */
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
index a3b3d5f..273cecb 100644
--- a/src/include/postmaster/bgworker.h
+++ b/src/include/postmaster/bgworker.h
@@ -130,6 +130,9 @@ extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry;
  */
 extern void BackgroundWorkerInitializeConnection(char *dbname, char *username);
 
+/* Just like the above, but specifying database and user by OID. */
+extern void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid);
+
 /* Block/unblock signals in a background worker process */
 extern void BackgroundWorkerBlockSignals(void);
 extern void BackgroundWorkerUnblockSignals(void);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 0c4611b..9f4e4b9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot);
 
 extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
 							 TransactionId sourcexid);
+extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
 extern RunningTransactions GetRunningTransactionData(void);
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index c625562..3046e52 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -31,6 +31,7 @@ typedef enum
 {
 	PROCSIG_CATCHUP_INTERRUPT,	/* sinval catchup interrupt */
 	PROCSIG_NOTIFY_INTERRUPT,	/* listen/notify interrupt */
+	PROCSIG_PARALLEL_MESSAGE,	/* message from cooperating parallel backend */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_DATABASE,
diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h
index 9e482ff..232f729 100644
--- a/src/include/utils/combocid.h
+++ b/src/include/utils/combocid.h
@@ -21,5 +21,8 @@
  */
 
 extern void AtEOXact_ComboCid(void);
+extern void RestoreComboCIDState(char *comboCIDstate);
+extern void SerializeComboCIDState(Size maxsize, char *start_address);
+extern Size EstimateComboCIDStateSpace(void);
 
 #endif   /* COMBOCID_H */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index abe7016..7efd427 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -64,4 +64,10 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids)
 extern void TeardownHistoricSnapshot(bool is_error);
 extern bool HistoricSnapshotActive(void);
 
+extern Size EstimateSnapshotSpace(Snapshot snapshot);
+extern void SerializeSnapshot(Snapshot snapshot, Size maxsize,
+							  char *start_address);
+extern Snapshot RestoreSnapshot(char *start_address);
+extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc);
+
 #endif   /* SNAPMGR_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to