On Sat, Sep 26, 2015 at 10:16 AM, Robert Haas <robertmh...@gmail.com> wrote:
> On Sat, Sep 26, 2015 at 8:38 AM, Robert Haas <robertmh...@gmail.com> wrote:
>>> QueryDesc's totaltime is for instrumentation information for plugin's
>>> like pg_stat_statements and we need only the total buffer usage
>>> of each worker to make it work as the other information is already
>>> collected in master backend, so I think that should work as I have
>>> written.
>>
>> I don't think that's right at all.  First, an extension can choose to
>> look at any part of the Instrumentation, not just the buffer usage.
>> Secondly, the buffer usage inside QueryDesc's totaltime isn't the same
>> as the global pgBufferUsage.
>
> Oh... but I'm wrong.  As long as our local pgBufferUsage gets update
> correctly to incorporate the data from the other workers, the
> InstrStopNode(queryDesc->totaltime) will suck in those statistics.
> And the only other things getting updated are nTuples (which shouldn't
> count anything that the workers did), firsttuple (similarly), and
> counter (where the behavior is a bit more arguable, but just counting
> the master's wall-clock time is at least defensible).  So now I think
> you're right: this should be OK.

OK, so here's a patch extracted from your
parallel_seqscan_partialseqscan_v18.patch with a fairly substantial
amount of rework by me:

- I left out the Funnel node itself; this is just the infrastructure
portion of the patch.  I also left out the stop-the-executor early
stuff and the serialization of PARAM_EXEC values.  I want to have
those things, but I think they need more thought and study first.
- I reorganized the code a fair amount into a former that I thought
was clearer, and certainly is closer to what I did previously in
parallel.c.  I found your version had lots of functions with lots of
parameters, and I found that made the logic difficult to follow, at
least for me.  As part of that, I munged the interface a bit so that
execParallel.c returns a structure with a bunch of pointers in it
instead of separately returning each one as an out parameter.  I think
that's cleaner.  If we need to add more stuff in the future, that way
we don't break existing callers.
- I reworked the interface with instrument.c and tried to preserve
something of an abstraction boundary there.  I also changed the way
that stuff accumulated statistics to include more things; I couldn't
see any reason to make it as narrow as you had it.
- I did a bunch of cosmetic cleanup, including changing function names
and rewriting comments.
- I replaced your code for serializing and restoring a ParamListInfo
with my version.
- I fixed the code so that it can handle collecting instrumentation
data from multiple nodes, bringing all the data back to the leader and
associating it with the right plan node.  This involved giving every
plan node a unique ID, as discussed with Tom on another recent thread.

After I did all that, I wrote some test code, which is also attached
here, that adds a new GUC force_parallel_worker.  If you set that GUC,
when you run a query, it'll run the query in a parallel worker and
feed the results back to the master.  I've tested this and it seems to
work, at least on the queries where you'd expect it to work.  It's
just test code, so it doesn't have error checking or make any attempt
not to push down queries that will fail in parallel mode. But you can
use it to see what happens.  You can also run queries under EXPLAIN
ANALYZE this way and, lo and behold, the worker stats show up attached
to the correct plan nodes.

I intend to commit this patch (but not the crappy test code, of
course) pretty soon, and then I'm going to start working on the
portion of the patch that actually adds the Funnel node, which I think
you are working on renaming to Gather.  I think that getting that part
committed is likely to be pretty straightforward; it doesn't need to
do a lot more than call this stuff and tell it to go do its thing.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From 7c5fa754fada96553930820b970da876fb1dc468 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 25 Sep 2015 17:50:19 -0400
Subject: [PATCH 2/2] Test code.

---
 src/backend/executor/execMain.c     | 47 ++++++++++++++++++++++++++++++++++++-
 src/backend/executor/execParallel.c |  4 +++-
 src/backend/utils/misc/guc.c        | 11 +++++++++
 src/include/utils/guc.h             |  2 ++
 4 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 85ff46b..4863afd 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -45,6 +45,8 @@
 #include "commands/matview.h"
 #include "commands/trigger.h"
 #include "executor/execdebug.h"
+#include "executor/execParallel.h"
+#include "executor/tqueue.h"
 #include "foreign/fdwapi.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
@@ -339,13 +341,56 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 	 * run plan
 	 */
 	if (!ScanDirectionIsNoMovement(direction))
-		ExecutePlan(estate,
+	{
+		if (force_parallel_worker && !IsParallelWorker())
+		{
+			ParallelExecutorInfo *pei;
+			TupleQueueFunnel *funnel;
+			TupleDesc tupType;
+			TupleTableSlot *slot;
+
+			EnterParallelMode();
+
+			/* Utterly disregarding sanity checks, let's try this out... */
+			pei = ExecInitParallelPlan(queryDesc->planstate, estate, 1);
+			LaunchParallelWorkers(pei->pcxt);
+
+			/* Set up to send results wherever they're supposed to go. */
+			funnel = CreateTupleQueueFunnel();
+			RegisterTupleQueueOnFunnel(funnel, pei->tqueue[0]);
+			tupType = ExecGetResultType(queryDesc->planstate);
+			slot = MakeSingleTupleTableSlot(tupType);
+
+			/* Read tuples from the worker and send them to the receiver. */
+			for (;;)
+			{
+				HeapTuple tup;
+				bool done;
+
+				tup = TupleQueueFunnelNext(funnel, false, &done);
+				if (done)
+					break;
+				Assert(tup != NULL);
+				ExecStoreTuple(tup, slot, InvalidBuffer, true);
+				(*dest->receiveSlot) (slot, dest);
+			}
+
+			/* Clean up. */
+			ExecParallelFinish(pei);
+			DestroyParallelContext(pei->pcxt);
+			ExitParallelMode();
+		}
+		else
+		{
+			ExecutePlan(estate,
 					queryDesc->planstate,
 					operation,
 					sendTuples,
 					count,
 					direction,
 					dest);
+		}
+	}
 
 	/*
 	 * shutdown tuple receiver, if we started it
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index a409a9a..9c8bf4b 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	ExecutorStart(queryDesc, 0);
 	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
 	ExecutorFinish(queryDesc);
-	ExecutorEnd(queryDesc);
 
 	/* Report buffer usage during parallel execution. */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
@@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 		ExecParallelReportInstrumentation(queryDesc->planstate,
 										  instrumentation);
 
+	/* Must do this after capturing instrumentation. */
+	ExecutorEnd(queryDesc);
+
 	/* Cleanup. */
 	FreeQueryDesc(queryDesc);
 	(*receiver->rDestroy) (receiver);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 17053af..11c54dd 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -112,6 +112,8 @@ extern char *temp_tablespaces;
 extern bool ignore_checksum_failure;
 extern bool synchronize_seqscans;
 
+bool force_parallel_worker;
+
 #ifdef TRACE_SYNCSCAN
 extern bool trace_syncscan;
 #endif
@@ -755,6 +757,15 @@ static const unit_conversion time_unit_conversion_table[] =
 static struct config_bool ConfigureNamesBool[] =
 {
 	{
+		{"force_parallel_worker", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Force use of a parallel worker in the executor."),
+			NULL
+		},
+		&force_parallel_worker,
+		false,
+		NULL, NULL, NULL
+	},
+	{
 		{"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of sequential-scan plans."),
 			NULL
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index dc167f9..702e067 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -433,4 +433,6 @@ extern void assign_search_path(const char *newval, void *extra);
 extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
 extern void assign_xlog_sync_method(int new_sync_method, void *extra);
 
+extern bool force_parallel_worker;
+
 #endif   /* GUC_H */
-- 
2.3.8 (Apple Git-58)

From ed7fdc90c3835b43f74f5bd1f45253ceef64b9e6 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 25 Sep 2015 13:57:25 -0400
Subject: [PATCH 1/2] Parallel executor support.

---
 src/backend/executor/Makefile        |   3 +-
 src/backend/executor/execParallel.c  | 585 +++++++++++++++++++++++++++++++++++
 src/backend/executor/instrument.c    |  78 +++++
 src/backend/executor/tqueue.c        |   4 +-
 src/backend/nodes/copyfuncs.c        |   1 +
 src/backend/nodes/outfuncs.c         |   1 +
 src/backend/nodes/params.c           | 155 ++++++++++
 src/backend/nodes/readfuncs.c        |   1 +
 src/backend/optimizer/plan/planner.c |   1 +
 src/backend/optimizer/plan/setrefs.c |   5 +
 src/backend/utils/adt/datum.c        | 118 +++++++
 src/include/executor/execParallel.h  |  36 +++
 src/include/executor/instrument.h    |   5 +
 src/include/nodes/params.h           |   3 +
 src/include/nodes/plannodes.h        |   1 +
 src/include/nodes/relation.h         |   2 +
 src/include/utils/datum.h            |  10 +
 17 files changed, 1007 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/executor/execParallel.c
 create mode 100644 src/include/executor/execParallel.h

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 249534b..f5e1e1a 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -13,7 +13,8 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
-       execMain.o execProcnode.o execQual.o execScan.o execTuples.o \
+       execMain.o execParallel.o execProcnode.o execQual.o \
+       execScan.o execTuples.o \
        execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \
        nodeBitmapAnd.o nodeBitmapOr.o \
        nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
new file mode 100644
index 0000000..a409a9a
--- /dev/null
+++ b/src/backend/executor/execParallel.c
@@ -0,0 +1,585 @@
+/*-------------------------------------------------------------------------
+ *
+ * execParallel.c
+ *	  Support routines for parallel execution.
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execParallel.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execParallel.h"
+#include "executor/executor.h"
+#include "executor/tqueue.h"
+#include "nodes/nodeFuncs.h"
+#include "optimizer/planmain.h"
+#include "optimizer/planner.h"
+#include "storage/spin.h"
+#include "tcop/tcopprot.h"
+#include "utils/memutils.h"
+#include "utils/snapmgr.h"
+
+/*
+ * Magic numbers for parallel executor communication.  We use constants
+ * greater than any 32-bit integer here so that values < 2^32 can be used
+ * by individual parallel nodes to store their own state.
+ */
+#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005)
+
+#define PARALLEL_TUPLE_QUEUE_SIZE		65536
+
+/* DSM structure for accumulating per-PlanState instrumentation. */
+typedef struct SharedPlanStateInstrumentation
+{
+	int plan_node_id;
+	slock_t mutex;
+	Instrumentation	instr;
+} SharedPlanStateInstrumentation;
+
+/* DSM structure for accumulating per-PlanState instrumentation. */
+struct SharedExecutorInstrumentation
+{
+	int instrument_options;
+	int ps_ninstrument;			/* # of ps_instrument structures following */
+	SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+};
+
+/* Context object for ExecParallelEstimate. */
+typedef struct ExecParallelEstimateContext
+{
+	ParallelContext *pcxt;
+	int nnodes;
+} ExecParallelEstimateContext;
+
+/* Context object for ExecParallelEstimate. */
+typedef struct ExecParallelInitializeDSMContext
+{
+	ParallelContext *pcxt;
+	SharedExecutorInstrumentation *instrumentation;
+	int nnodes;
+} ExecParallelInitializeDSMContext;
+
+/* Helper functions that run in the parallel leader. */
+static char *ExecSerializePlan(Plan *plan, List *rangetable);
+static bool ExecParallelEstimate(PlanState *node,
+					 ExecParallelEstimateContext *e);
+static bool ExecParallelInitializeDSM(PlanState *node,
+					 ExecParallelInitializeDSMContext *d);
+static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt);
+static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
+						  SharedExecutorInstrumentation *instrumentation);
+
+/* Helper functions that run in the parallel worker. */
+static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc);
+static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
+
+/*
+ * Create a serialized representation of the plan to be sent to each worker.
+ */
+static char *
+ExecSerializePlan(Plan *plan, List *rangetable)
+{
+	PlannedStmt *pstmt;
+	ListCell   *tlist;
+
+	/* We can't scribble on the original plan, so make a copy. */
+	plan = copyObject(plan);
+
+	/*
+	 * The worker will start its own copy of the executor, and that copy will
+	 * insert a junk filter if the toplevel node has any resjunk entries. We
+	 * don't want that to happen, because while resjunk columns shouldn't be
+	 * sent back to the user, here the tuples are coming back to another
+	 * backend which may very well need them.  So mutate the target list
+	 * accordingly.  This is sort of a hack; there might be better ways to do
+	 * this...
+	 */
+	foreach(tlist, plan->targetlist)
+	{
+		TargetEntry *tle = (TargetEntry *) lfirst(tlist);
+
+		tle->resjunk = false;
+	}
+
+	/*
+	 * Create a dummy PlannedStmt.  Most of the fields don't need to be valid
+	 * for our purposes, but the worker will need at least a minimal
+	 * PlannedStmt to start the executor.
+	 */
+	pstmt = makeNode(PlannedStmt);
+	pstmt->commandType = CMD_SELECT;
+	pstmt->queryId = 0;
+	pstmt->hasReturning = 0;
+	pstmt->hasModifyingCTE = 0;
+	pstmt->canSetTag = 1;
+	pstmt->transientPlan = 0;
+	pstmt->planTree = plan;
+	pstmt->rtable = rangetable;
+	pstmt->resultRelations = NIL;
+	pstmt->utilityStmt = NULL;
+	pstmt->subplans = NIL;
+	pstmt->rewindPlanIDs = NULL;
+	pstmt->rowMarks = NIL;
+	pstmt->nParamExec = 0;
+	pstmt->relationOids = NIL;
+	pstmt->invalItems = NIL;	/* workers can't replan anyway... */
+	pstmt->hasRowSecurity = false;
+
+	/* Return serialized copy of our dummy PlannedStmt. */
+	return nodeToString(pstmt);
+}
+
+/*
+ * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
+ * may need some state which is shared across all parallel workers.  Before
+ * we size the DSM, give them a chance to call shm_toc_estimate_chunk or
+ * shm_toc_estimate_keys on &pcxt->estimator.
+ *
+ * While we're at it, count the number of PlanState nodes in the tree, so
+ * we know how many SharedPlanStateInstrumentation structures we need.
+ */
+static bool
+ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
+{
+	if (planstate == NULL)
+		return false;
+
+	/* Count this node. */
+	e->nnodes++;
+
+	/*
+	 * XXX. Call estimators for parallel-aware nodes here, when we have
+	 * some.
+	 */
+
+	return planstate_tree_walker(planstate, ExecParallelEstimate, e);
+}
+
+/*
+ * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes
+ * may need to initialize shared state in the DSM before parallel workers
+ * are available.  They can allocate the space they previous estimated using
+ * shm_toc_allocate, and add the keys they previously estimated using
+ * shm_toc_insert, in each case targeting pcxt->toc.
+ */
+static bool
+ExecParallelInitializeDSM(PlanState *planstate,
+						  ExecParallelInitializeDSMContext *d)
+{
+	if (planstate == NULL)
+		return false;
+
+	/* If instrumentation is enabled, initialize array slot for this node. */
+	if (d->instrumentation != NULL)
+	{
+		SharedPlanStateInstrumentation *instrumentation;
+
+		instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
+		Assert(d->nnodes < d->instrumentation->ps_ninstrument);
+		instrumentation->plan_node_id = planstate->plan->plan_node_id;
+		SpinLockInit(&instrumentation->mutex);
+		InstrInit(&instrumentation->instr,
+				  d->instrumentation->instrument_options);
+	}
+
+	/* Count this node. */
+	d->nnodes++;
+
+	/*
+	 * XXX. Call initializers for parallel-aware plan nodes, when we have
+	 * some.
+	 */
+
+	return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
+}
+
+/*
+ * It sets up the response queues for backend workers to return tuples
+ * to the main backend and start the workers.
+ */
+static shm_mq_handle **
+ExecParallelSetupTupleQueues(ParallelContext *pcxt)
+{
+	shm_mq_handle **responseq;
+	char	   *tqueuespace;
+	int			i;
+
+	/* Skip this if no workers. */
+	if (pcxt->nworkers == 0)
+		return NULL;
+
+	/* Allocate memory for shared memory queue handles. */
+	responseq = (shm_mq_handle **)
+		palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
+
+	/* Allocate space from the DSM for the queues themselves. */
+	tqueuespace = shm_toc_allocate(pcxt->toc,
+								 PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+
+	/* Create the queues, and become the receiver for each. */
+	for (i = 0; i < pcxt->nworkers; ++i)
+	{
+		shm_mq	   *mq;
+
+		mq = shm_mq_create(tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE,
+						   (Size) PARALLEL_TUPLE_QUEUE_SIZE);
+
+		shm_mq_set_receiver(mq, MyProc);
+		responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
+	}
+
+	/* Add array of queues to shm_toc, so others can find it. */
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
+
+	/* Return array of handles. */
+	return responseq;
+}
+
+/*
+ * Sets up the required infrastructure for backend workers to perform
+ * execution and return results to the main backend.
+ */
+ParallelExecutorInfo *
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+{
+	ParallelExecutorInfo *pei;
+	ParallelContext *pcxt;
+	ExecParallelEstimateContext e;
+	ExecParallelInitializeDSMContext d;
+	char	   *pstmt_data;
+	char	   *pstmt_space;
+	char	   *param_space;
+	BufferUsage *bufusage_space;
+	SharedExecutorInstrumentation *instrumentation = NULL;
+	int			pstmt_len;
+	int			param_len;
+	int			instrumentation_len = 0;
+
+	/* Allocate object for return value. */
+	pei = palloc0(sizeof(ParallelExecutorInfo));
+	pei->planstate = planstate;
+
+	/* Fix up and serialize plan to be sent to workers. */
+	pstmt_data = ExecSerializePlan(planstate->plan, estate->es_range_table);
+
+	/* Create a parallel context. */
+	pcxt = CreateParallelContext(ParallelQueryMain, nworkers);
+	pei->pcxt = pcxt;
+
+	/*
+	 * Before telling the parallel context to create a dynamic shared memory
+	 * segment, we need to figure out how big it should be.  Estimate space
+	 * for the various things we need to store.
+	 */
+
+	/* Estimate space for serialized PlannedStmt. */
+	pstmt_len = strlen(pstmt_data) + 1;
+	shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate space for serialized ParamListInfo. */
+	param_len = EstimateParamListSpace(estate->es_param_list_info);
+	shm_toc_estimate_chunk(&pcxt->estimator, param_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Estimate space for BufferUsage.
+	 *
+	 * If EXPLAIN is not in use and there are no extensions loaded that care,
+	 * we could skip this.  But we have no way of knowing whether anyone's
+	 * looking at pgBufferUsage, so do it unconditionally.
+	 */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   sizeof(BufferUsage) * pcxt->nworkers);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/* Estimate space for tuple queues. */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
+	/*
+	 * Give parallel-aware nodes a chance to add to the estimates, and get
+	 * a count of how many PlanState nodes there are.
+	 */
+	e.pcxt = pcxt;
+	e.nnodes = 0;
+	ExecParallelEstimate(planstate, &e);
+
+	/* Estimate space for instrumentation, if required. */
+	if (estate->es_instrument)
+	{
+		instrumentation_len =
+			offsetof(SharedExecutorInstrumentation, ps_instrument)
+			+ sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+		shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
+	/* Everyone's had a chance to ask for space, so now create the DSM. */
+	InitializeParallelDSM(pcxt);
+
+	/*
+	 * OK, now we have a dynamic shared memory segment, and it should be big
+	 * enough to store all of the data we estimated we would want to put into
+	 * it, plus whatever general stuff (not specifically executor-related) the
+	 * ParallelContext itself needs to store there.  None of the space we
+	 * asked for has been allocated or initialized yet, though, so do that.
+	 */
+
+	/* Store serialized PlannedStmt. */
+	pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
+	memcpy(pstmt_space, pstmt_data, pstmt_len);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
+
+	/* Store serialized ParamListInfo. */
+	param_space = shm_toc_allocate(pcxt->toc, param_len);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, param_space);
+	SerializeParamList(estate->es_param_list_info, &param_space);
+
+	/* Allocate space for each worker's BufferUsage; no need to initialize. */
+	bufusage_space = shm_toc_allocate(pcxt->toc,
+									  sizeof(BufferUsage) * pcxt->nworkers);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
+	pei->buffer_usage = bufusage_space;
+
+	/* Set up tuple queues. */
+	pei->tqueue = ExecParallelSetupTupleQueues(pcxt);
+
+	/*
+	 * If instrumentation options were supplied, allocate space for the
+	 * data.  It only gets partially initialized here; the rest happens
+	 * during ExecParallelInitializeDSM.
+	 */
+	if (estate->es_instrument)
+	{
+		instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
+		instrumentation->instrument_options = estate->es_instrument;
+		instrumentation->ps_ninstrument = e.nnodes;
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
+					   instrumentation);
+		pei->instrumentation = instrumentation;
+	}
+
+	/*
+	 * Give parallel-aware nodes a chance to initialize their shared data.
+	 * This also initializes the elements of instrumentation->ps_instrument,
+	 * if it exists.
+	 */
+	d.pcxt = pcxt;
+	d.instrumentation = instrumentation;
+	d.nnodes = 0;
+	ExecParallelInitializeDSM(planstate, &d);
+
+	/*
+	 * Make sure that the world hasn't shifted under our feat.  This could
+	 * probably just be an Assert(), but let's be conservative for now.
+	 */
+	if (e.nnodes != d.nnodes)
+		elog(ERROR, "inconsistent count of PlanState nodes");
+
+	/* OK, we're ready to rock and roll. */
+	return pei;
+}
+
+/*
+ * Copy instrumentation information about this node and its descendents from
+ * dynamic shared memory.
+ */
+static bool
+ExecParallelRetrieveInstrumentation(PlanState *planstate,
+						  SharedExecutorInstrumentation *instrumentation)
+{
+	int		i;
+	int		plan_node_id = planstate->plan->plan_node_id;
+	SharedPlanStateInstrumentation *ps_instrument;
+
+	/* Find the instumentation for this node. */
+	for (i = 0; i < instrumentation->ps_ninstrument; ++i)
+		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+			break;
+	if (i >= instrumentation->ps_ninstrument)
+		elog(ERROR, "plan node %d not found", plan_node_id);
+
+	/* No need to acquire the spinlock here; workers have exited already. */
+	ps_instrument = &instrumentation->ps_instrument[i];
+	InstrAggNode(planstate->instrument, &ps_instrument->instr);
+
+	return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
+								 instrumentation);
+}
+
+/*
+ * Finish parallel execution.  We wait for parallel workers to finish, and
+ * accumulate their buffer usage and instrumentation.
+ */
+void
+ExecParallelFinish(ParallelExecutorInfo *pei)
+{
+	int		i;
+
+	/* First, wait for the workers to finish. */
+	WaitForParallelWorkersToFinish(pei->pcxt);
+
+	/* Next, accumulate buffer usage. */
+	for (i = 0; i < pei->pcxt->nworkers; ++i)
+		InstrAccumParallelQuery(&pei->buffer_usage[i]);
+
+	/* Finally, accumulate instrumentation, if any. */
+	if (pei->instrumentation)
+		ExecParallelRetrieveInstrumentation(pei->planstate,
+											pei->instrumentation);
+}
+
+/*
+ * Create a DestReceiver to write tuples we produce to the shm_mq designated
+ * for that purpose.
+ */
+static DestReceiver *
+ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
+{
+	char	   *mqspace;
+	shm_mq	   *mq;
+
+	mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE);
+	mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
+	mq = (shm_mq *) mqspace;
+	shm_mq_set_sender(mq, MyProc);
+	return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
+}
+
+/*
+ * Create a QueryDesc for the PlannedStmt we are to execute, and return it.
+ */
+static QueryDesc *
+ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
+						 int instrument_options)
+{
+	char	   *pstmtspace;
+	char	   *paramspace;
+	PlannedStmt *pstmt;
+	ParamListInfo paramLI;
+
+	/* Reconstruct leader-supplied PlannedStmt. */
+	pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT);
+	pstmt = (PlannedStmt *) stringToNode(pstmtspace);
+
+	/* Reconstruct ParamListInfo. */
+	paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS);
+	paramLI = RestoreParamList(&paramspace);
+
+	/*
+	 * Create a QueryDesc for the query.
+	 *
+	 * It's not obvious how to obtain the query string from here; and even if
+	 * we could copying it would take more cycles than not copying it. But
+	 * it's a bit unsatisfying to just use a dummy string here, so consider
+	 * revising this someday.
+	 */
+	return CreateQueryDesc(pstmt,
+						   "<parallel query>",
+						   GetActiveSnapshot(), InvalidSnapshot,
+						   receiver, paramLI, instrument_options);
+}
+
+/*
+ * Copy instrumentation information from this node and its descendents into
+ * dynamic shared memory, so that the parallel leader can retrieve it.
+ */
+static bool
+ExecParallelReportInstrumentation(PlanState *planstate,
+						  SharedExecutorInstrumentation *instrumentation)
+{
+	int		i;
+	int		plan_node_id = planstate->plan->plan_node_id;
+	SharedPlanStateInstrumentation *ps_instrument;
+
+	/*
+	 * If we shuffled the plan_node_id values in ps_instrument into sorted
+	 * order, we could use binary search here.  This might matter someday
+	 * if we're pushing down sufficiently large plan trees.  For now, do it
+	 * the slow, dumb way.
+	 */
+	for (i = 0; i < instrumentation->ps_ninstrument; ++i)
+		if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+			break;
+	if (i >= instrumentation->ps_ninstrument)
+		elog(ERROR, "plan node %d not found", plan_node_id);
+
+	/*
+	 * There's one SharedPlanStateInstrumentation per plan_node_id, so we
+	 * must use a spinlock in case multiple workers report at the same time.
+	 */
+	ps_instrument = &instrumentation->ps_instrument[i];
+	SpinLockAcquire(&ps_instrument->mutex);
+	InstrAggNode(&ps_instrument->instr, planstate->instrument);
+	SpinLockRelease(&ps_instrument->mutex);
+
+	return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
+								 instrumentation);
+}
+
+/*
+ * Main entrypoint for parallel query worker processes.
+ *
+ * We reach this function from ParallelMain, so the setup necessary to create
+ * a sensible parallel environment has already been done; ParallelMain worries
+ * about stuff like the transaction state, combo CID mappings, and GUC values,
+ * so we don't need to deal with any of that here.
+ *
+ * Our job is to deal with concerns specific to the executor.  The parallel
+ * group leader will have stored a serialized PlannedStmt, and it's our job
+ * to execute that plan and write the resulting tuples to the appropriate
+ * tuple queue.  Various bits of supporting information that we need in order
+ * to do this are also stored in the dsm_segment and can be accessed through
+ * the shm_toc.
+ */
+static void
+ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
+{
+	BufferUsage *buffer_usage;
+	DestReceiver *receiver;
+	QueryDesc  *queryDesc;
+	SharedExecutorInstrumentation *instrumentation;
+	int			instrument_options = 0;
+
+	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
+	receiver = ExecParallelGetReceiver(seg, toc);
+	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION);
+	if (instrumentation != NULL)
+		instrument_options = instrumentation->instrument_options;
+	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
+
+	/* Prepare to track buffer usage during query execution. */
+	InstrStartParallelQuery();
+
+	/* Start up the executor, have it run the plan, and then shut it down. */
+	ExecutorStart(queryDesc, 0);
+	ExecutorRun(queryDesc, ForwardScanDirection, 0L);
+	ExecutorFinish(queryDesc);
+	ExecutorEnd(queryDesc);
+
+	/* Report buffer usage during parallel execution. */
+	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE);
+	InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+
+	/* Report instrumentation data if any instrumentation options are set. */
+	if (instrumentation != NULL)
+		ExecParallelReportInstrumentation(queryDesc->planstate,
+										  instrumentation);
+
+	/* Cleanup. */
+	FreeQueryDesc(queryDesc);
+	(*receiver->rDestroy) (receiver);
+}
diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c
index f5351eb..bf509b1 100644
--- a/src/backend/executor/instrument.c
+++ b/src/backend/executor/instrument.c
@@ -18,7 +18,9 @@
 #include "executor/instrument.h"
 
 BufferUsage pgBufferUsage;
+static BufferUsage save_pgBufferUsage;
 
+static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
 static void BufferUsageAccumDiff(BufferUsage *dst,
 					 const BufferUsage *add, const BufferUsage *sub);
 
@@ -47,6 +49,15 @@ InstrAlloc(int n, int instrument_options)
 	return instr;
 }
 
+/* Initialize an pre-allocated instrumentation structure. */
+void
+InstrInit(Instrumentation *instr, int instrument_options)
+{
+	memset(instr, 0, sizeof(Instrumentation));
+	instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+	instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
+}
+
 /* Entry to a plan node */
 void
 InstrStartNode(Instrumentation *instr)
@@ -127,6 +138,73 @@ InstrEndLoop(Instrumentation *instr)
 	instr->tuplecount = 0;
 }
 
+/* aggregate instrumentation information */
+void
+InstrAggNode(Instrumentation *dst, Instrumentation *add)
+{
+	if (!dst->running && add->running)
+	{
+		dst->running = true;
+		dst->firsttuple = add->firsttuple;
+	}
+	else if (dst->running && add->running && dst->firsttuple > add->firsttuple)
+		dst->firsttuple = add->firsttuple;
+
+	INSTR_TIME_ADD(dst->counter, add->counter);
+
+	dst->tuplecount += add->tuplecount;
+	dst->startup += add->startup;
+	dst->total += add->total;
+	dst->ntuples += add->ntuples;
+	dst->nloops += add->nloops;
+	dst->nfiltered1 += add->nfiltered1;
+	dst->nfiltered2 += add->nfiltered2;
+
+	/* Add delta of buffer usage since entry to node's totals */
+	if (dst->need_bufusage)
+		BufferUsageAdd(&dst->bufusage, &add->bufusage);
+}
+
+/* note current values during parallel executor startup */
+void
+InstrStartParallelQuery(void)
+{
+	save_pgBufferUsage = pgBufferUsage;
+}
+
+/* report usage after parallel executor shutdown */
+void
+InstrEndParallelQuery(BufferUsage *result)
+{
+	memset(result, 0, sizeof(BufferUsage));
+	BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage);
+}
+
+/* accumulate work done by workers in leader's stats */
+void
+InstrAccumParallelQuery(BufferUsage *result)
+{
+	BufferUsageAdd(&pgBufferUsage, result);
+}
+
+/* dst += add */
+static void
+BufferUsageAdd(BufferUsage *dst, const BufferUsage *add)
+{
+	dst->shared_blks_hit += add->shared_blks_hit;
+	dst->shared_blks_read += add->shared_blks_read;
+	dst->shared_blks_dirtied += add->shared_blks_dirtied;
+	dst->shared_blks_written += add->shared_blks_written;
+	dst->local_blks_hit += add->local_blks_hit;
+	dst->local_blks_read += add->local_blks_read;
+	dst->local_blks_dirtied += add->local_blks_dirtied;
+	dst->local_blks_written += add->local_blks_written;
+	dst->temp_blks_read += add->temp_blks_read;
+	dst->temp_blks_written += add->temp_blks_written;
+	INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time);
+	INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time);
+}
+
 /* dst += add - sub */
 static void
 BufferUsageAccumDiff(BufferUsage *dst,
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index d0edf4e..67143d3 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -66,7 +66,9 @@ tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
 static void
 tqueueShutdownReceiver(DestReceiver *self)
 {
-	/* do nothing */
+	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+
+	shm_mq_detach(shm_mq_get_queue(tqueue->handle));
 }
 
 /*
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 62355aa..4b4ddec 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -112,6 +112,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
 	COPY_SCALAR_FIELD(total_cost);
 	COPY_SCALAR_FIELD(plan_rows);
 	COPY_SCALAR_FIELD(plan_width);
+	COPY_SCALAR_FIELD(plan_node_id);
 	COPY_NODE_FIELD(targetlist);
 	COPY_NODE_FIELD(qual);
 	COPY_NODE_FIELD(lefttree);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index c91273c..ee9c360 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -271,6 +271,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
 	WRITE_FLOAT_FIELD(total_cost, "%.2f");
 	WRITE_FLOAT_FIELD(plan_rows, "%.0f");
 	WRITE_INT_FIELD(plan_width);
+	WRITE_INT_FIELD(plan_node_id);
 	WRITE_NODE_FIELD(targetlist);
 	WRITE_NODE_FIELD(qual);
 	WRITE_NODE_FIELD(lefttree);
diff --git a/src/backend/nodes/params.c b/src/backend/nodes/params.c
index fb803f8..d093263 100644
--- a/src/backend/nodes/params.c
+++ b/src/backend/nodes/params.c
@@ -16,6 +16,7 @@
 #include "postgres.h"
 
 #include "nodes/params.h"
+#include "storage/shmem.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
 
@@ -73,3 +74,157 @@ copyParamList(ParamListInfo from)
 
 	return retval;
 }
+
+/*
+ * Estimate the amount of space required to serialize a ParamListInfo.
+ */
+Size
+EstimateParamListSpace(ParamListInfo paramLI)
+{
+	int		i;
+	Size	sz = sizeof(int);
+
+	if (paramLI == NULL || paramLI->numParams <= 0)
+		return sz;
+
+	for (i = 0; i < paramLI->numParams; i++)
+	{
+		ParamExternData *prm = &paramLI->params[i];
+		int16		typLen;
+		bool		typByVal;
+
+		/* give hook a chance in case parameter is dynamic */
+		if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL)
+			(*paramLI->paramFetch) (paramLI, i + 1);
+
+		sz = add_size(sz, sizeof(Oid));			/* space for type OID */
+		sz = add_size(sz, sizeof(uint16));		/* space for pflags */
+
+		/* space for datum/isnull */
+		if (OidIsValid(prm->ptype))
+			get_typlenbyval(prm->ptype, &typLen, &typByVal);
+		else
+		{
+			/* If no type OID, assume by-value, like copyParamList does. */
+			typLen = sizeof(Datum);
+			typByVal = true;
+		}
+		sz = add_size(sz,
+			datumEstimateSpace(prm->value, prm->isnull, typByVal, typLen));
+	}
+
+	return sz;
+}
+
+/*
+ * Serialize a paramListInfo structure into caller-provided storage.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn.  The details for each parameter
+ * consist of a 4-byte type OID, 2 bytes of flags, and then the datum as
+ * serialized by datumSerialize().  The caller is responsible for ensuring
+ * that there is enough storage to store the number of bytes that will be
+ * written; use EstimateParamListSpace to find out how many will be needed.
+ * *start_address is updated to point to the byte immediately following those
+ * written.
+ *
+ * RestoreParamList can be used to recreate a ParamListInfo based on the
+ * serialized representation; this will be a static, self-contained copy
+ * just as copyParamList would create.
+ */
+void
+SerializeParamList(ParamListInfo paramLI, char **start_address)
+{
+	int			nparams;
+	int			i;
+
+	/* Write number of parameters. */
+	if (paramLI == NULL || paramLI->numParams <= 0)
+		nparams = 0;
+	else
+		nparams = paramLI->numParams;
+	memcpy(*start_address, &nparams, sizeof(int));
+	*start_address += sizeof(int);
+
+	/* Write each parameter in turn. */
+	for (i = 0; i < nparams; i++)
+	{
+		ParamExternData *prm = &paramLI->params[i];
+		int16		typLen;
+		bool		typByVal;
+
+		/* give hook a chance in case parameter is dynamic */
+		if (!OidIsValid(prm->ptype) && paramLI->paramFetch != NULL)
+			(*paramLI->paramFetch) (paramLI, i + 1);
+
+		/* Write type OID. */
+		memcpy(*start_address, &prm->ptype, sizeof(Oid));
+		*start_address += sizeof(Oid);
+
+		/* Write flags. */
+		memcpy(*start_address, &prm->pflags, sizeof(uint16));
+		*start_address += sizeof(uint16);
+
+		/* Write datum/isnull. */
+		if (OidIsValid(prm->ptype))
+			get_typlenbyval(prm->ptype, &typLen, &typByVal);
+		else
+		{
+			/* If no type OID, assume by-value, like copyParamList does. */
+			typLen = sizeof(Datum);
+			typByVal = true;
+		}
+		datumSerialize(prm->value, prm->isnull, typByVal, typLen,
+					   start_address);
+	}
+}
+
+/*
+ * Copy a ParamListInfo structure.
+ *
+ * The result is allocated in CurrentMemoryContext.
+ *
+ * Note: the intent of this function is to make a static, self-contained
+ * set of parameter values.  If dynamic parameter hooks are present, we
+ * intentionally do not copy them into the result.  Rather, we forcibly
+ * instantiate all available parameter values and copy the datum values.
+ */
+ParamListInfo
+RestoreParamList(char **start_address)
+{
+	ParamListInfo paramLI;
+	Size		size;
+	int			i;
+	int			nparams;
+
+	memcpy(&nparams, *start_address, sizeof(int));
+	*start_address += sizeof(int);
+
+	size = offsetof(ParamListInfoData, params) +
+		nparams * sizeof(ParamExternData);
+
+	paramLI = (ParamListInfo) palloc(size);
+	paramLI->paramFetch = NULL;
+	paramLI->paramFetchArg = NULL;
+	paramLI->parserSetup = NULL;
+	paramLI->parserSetupArg = NULL;
+	paramLI->numParams = nparams;
+
+	for (i = 0; i < nparams; i++)
+	{
+		ParamExternData *prm = &paramLI->params[i];
+
+		/* Read type OID. */
+		memcpy(&prm->ptype, *start_address, sizeof(Oid));
+		*start_address += sizeof(Oid);
+
+		/* Read flags. */
+		memcpy(&prm->pflags, *start_address, sizeof(uint16));
+		*start_address += sizeof(uint16);
+
+		/* Read datum/isnull. */
+		prm->value = datumRestore(start_address, &prm->isnull);
+	}
+
+	return paramLI;
+}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 08519ed..72368ab 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1413,6 +1413,7 @@ ReadCommonPlan(Plan *local_node)
 	READ_FLOAT_FIELD(total_cost);
 	READ_FLOAT_FIELD(plan_rows);
 	READ_INT_FIELD(plan_width);
+	READ_INT_FIELD(plan_node_id);
 	READ_NODE_FIELD(targetlist);
 	READ_NODE_FIELD(qual);
 	READ_NODE_FIELD(lefttree);
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 06be922..e1ee67c 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -196,6 +196,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
 	glob->nParamExec = 0;
 	glob->lastPHId = 0;
 	glob->lastRowMarkId = 0;
+	glob->lastPlanNodeId = 0;
 	glob->transientPlan = false;
 	glob->hasRowSecurity = false;
 
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index daeb584..3c81697 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -174,6 +174,8 @@ static bool extract_query_dependencies_walker(Node *node,
  * Currently, relations and user-defined functions are the only types of
  * objects that are explicitly tracked this way.
  *
+ * 7. We assign every plan node in the tree a unique ID.
+ *
  * We also perform one final optimization step, which is to delete
  * SubqueryScan plan nodes that aren't doing anything useful (ie, have
  * no qual and a no-op targetlist).  The reason for doing this last is that
@@ -436,6 +438,9 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 	if (plan == NULL)
 		return NULL;
 
+	/* Assign this node a unique ID. */
+	plan->plan_node_id = root->glob->lastPlanNodeId++;
+
 	/*
 	 * Plan-type-specific fixes
 	 */
diff --git a/src/backend/utils/adt/datum.c b/src/backend/utils/adt/datum.c
index e8af030..3d9e354 100644
--- a/src/backend/utils/adt/datum.c
+++ b/src/backend/utils/adt/datum.c
@@ -246,3 +246,121 @@ datumIsEqual(Datum value1, Datum value2, bool typByVal, int typLen)
 	}
 	return res;
 }
+
+/*-------------------------------------------------------------------------
+ * datumEstimateSpace
+ *
+ * Compute the amount of space that datumSerialize will require for a
+ * particular Datum.
+ *-------------------------------------------------------------------------
+ */
+Size
+datumEstimateSpace(Datum value, bool isnull, bool typByVal, int typLen)
+{
+	Size	sz = sizeof(int);
+
+	if (!isnull)
+	{
+		/* no need to use add_size, can't overflow */
+		if (typByVal)
+			sz += sizeof(Datum);
+		else
+			sz += datumGetSize(value, typByVal, typLen);
+	}
+
+	return sz;
+}
+
+/*-------------------------------------------------------------------------
+ * datumSerialize
+ *
+ * Serialize a possibly-NULL datum into caller-provided storage.
+ *
+ * The format is as follows: first, we write a 4-byte header word, which
+ * is either the length of a pass-by-reference datum, -1 for a
+ * pass-by-value datum, or -2 for a NULL.  If the value is NULL, nothing
+ * further is written.  If it is pass-by-value, sizeof(Datum) bytes
+ * follow.  Otherwise, the number of bytes indicated by the header word
+ * follow.  The caller is responsible for ensuring that there is enough
+ * storage to store the number of bytes that will be written; use
+ * datumEstimateSpace() to find out how many will be needed.
+ * *start_address is updated to point to the byte immediately following
+ * those written.
+ *-------------------------------------------------------------------------
+ */
+void
+datumSerialize(Datum value, bool isnull, bool typByVal, int typLen,
+			   char **start_address)
+{
+	int		header;
+
+	/* Write header word. */
+	if (isnull)
+		header = -2;
+	else if (typByVal)
+		header = -1;
+	else
+		header = datumGetSize(value, typByVal, typLen);
+	memcpy(*start_address, &header, sizeof(int));
+	*start_address += sizeof(int);
+
+	/* If not null, write payload bytes. */
+	if (!isnull)
+	{
+		if (typByVal)
+		{
+			memcpy(*start_address, &value, sizeof(Datum));
+			*start_address += sizeof(Datum);
+		}
+		else
+		{
+			memcpy(*start_address, DatumGetPointer(value), header);
+			*start_address += header;
+		}
+	}
+}
+
+/*-------------------------------------------------------------------------
+ * datumRestore
+ *
+ * Restore a possibly-NULL datum previously serialized by datumSerialize.
+ * *start_address is updated according to the number of bytes consumed.
+ *-------------------------------------------------------------------------
+ */
+Datum
+datumRestore(char **start_address, bool *isnull)
+{
+	int		header;
+	void   *d;
+
+	/* Read header word. */
+	memcpy(&header, *start_address, sizeof(int));
+	*start_address += sizeof(int);
+
+	/* If this datum is NULL, we can stop here. */
+	if (header == -2)
+	{
+		*isnull = true;
+		return (Datum) 0;
+	}
+
+	/* OK, datum is not null. */
+	*isnull = false;
+
+	/* If this datum is pass-by-value, sizeof(Datum) bytes follow. */
+	if (header == -1)
+	{
+		Datum		val;
+
+		memcpy(&val, *start_address, sizeof(Datum));
+		*start_address += sizeof(Datum);
+		return val;
+	}
+
+	/* Pass-by-reference case; copy indicated number of bytes. */
+	Assert(header > 0);
+	d = palloc(header);
+	memcpy(d, *start_address, header);
+	*start_address += header;
+	return PointerGetDatum(d);
+}
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
new file mode 100644
index 0000000..4fc797a
--- /dev/null
+++ b/src/include/executor/execParallel.h
@@ -0,0 +1,36 @@
+/*--------------------------------------------------------------------
+ * execParallel.h
+ *		POSTGRES parallel execution interface
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/include/executor/execParallel.h
+ *--------------------------------------------------------------------
+ */
+
+#ifndef EXECPARALLEL_H
+#define EXECPARALLEL_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+#include "nodes/parsenodes.h"
+#include "nodes/plannodes.h"
+
+typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
+
+typedef struct ParallelExecutorInfo
+{
+	PlanState *planstate;
+	ParallelContext *pcxt;
+	BufferUsage *buffer_usage;
+	SharedExecutorInstrumentation *instrumentation;
+	shm_mq_handle **tqueue;
+}	ParallelExecutorInfo;
+
+extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
+					 EState *estate, int nworkers);
+extern void ExecParallelFinish(ParallelExecutorInfo *pei);
+
+#endif   /* EXECPARALLEL_H */
diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h
index c9a2129..f28e56c 100644
--- a/src/include/executor/instrument.h
+++ b/src/include/executor/instrument.h
@@ -66,8 +66,13 @@ typedef struct Instrumentation
 extern PGDLLIMPORT BufferUsage pgBufferUsage;
 
 extern Instrumentation *InstrAlloc(int n, int instrument_options);
+extern void InstrInit(Instrumentation *instr, int instrument_options);
 extern void InstrStartNode(Instrumentation *instr);
 extern void InstrStopNode(Instrumentation *instr, double nTuples);
 extern void InstrEndLoop(Instrumentation *instr);
+extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
+extern void InstrStartParallelQuery(void);
+extern void InstrEndParallelQuery(BufferUsage *result);
+extern void InstrAccumParallelQuery(BufferUsage *result);
 
 #endif   /* INSTRUMENT_H */
diff --git a/src/include/nodes/params.h b/src/include/nodes/params.h
index a0f7dd0..83bebde 100644
--- a/src/include/nodes/params.h
+++ b/src/include/nodes/params.h
@@ -102,5 +102,8 @@ typedef struct ParamExecData
 
 /* Functions found in src/backend/nodes/params.c */
 extern ParamListInfo copyParamList(ParamListInfo from);
+extern Size EstimateParamListSpace(ParamListInfo paramLI);
+extern void SerializeParamList(ParamListInfo paramLI, char **start_address);
+extern ParamListInfo RestoreParamList(char **start_address);
 
 #endif   /* PARAMS_H */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index cc259f1..1e2d2bb 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -111,6 +111,7 @@ typedef struct Plan
 	/*
 	 * Common structural data for all Plan types.
 	 */
+	int			plan_node_id;	/* unique across entire final plan tree */
 	List	   *targetlist;		/* target list to be computed at this node */
 	List	   *qual;			/* implicitly-ANDed qual conditions */
 	struct Plan *lefttree;		/* input plan tree(s) */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 79bed33..961b5d1 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -99,6 +99,8 @@ typedef struct PlannerGlobal
 
 	Index		lastRowMarkId;	/* highest PlanRowMark ID assigned */
 
+	int			lastPlanNodeId;	/* highest plan node ID assigned */
+
 	bool		transientPlan;	/* redo plan when TransactionXmin changes? */
 
 	bool		hasRowSecurity; /* row security applied? */
diff --git a/src/include/utils/datum.h b/src/include/utils/datum.h
index c572f79..e9d4be5 100644
--- a/src/include/utils/datum.h
+++ b/src/include/utils/datum.h
@@ -46,4 +46,14 @@ extern Datum datumTransfer(Datum value, bool typByVal, int typLen);
 extern bool datumIsEqual(Datum value1, Datum value2,
 			 bool typByVal, int typLen);
 
+/*
+ * Serialize and restore datums so that we can transfer them to parallel
+ * workers.
+ */
+extern Size datumEstimateSpace(Datum value, bool isnull, bool typByVal,
+				   int typLen);
+extern void datumSerialize(Datum value, bool isnull, bool typByVal,
+			   int typLen, char **start_address);
+extern Datum datumRestore(char **start_address, bool *isnull);
+
 #endif   /* DATUM_H */
-- 
2.3.8 (Apple Git-58)

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to