The Gather node, as currently committed, is neither projection-capable
nor listed as an exception in is_projection_capable_plan.  Amit
discovered this in testing, and I hit it in my testing as well.  We
could just mark it as being not projection-capable, but I think it
might be better to go the other way and give it projection
capabilities.  Otherwise, we're going to start generating lots of
plans like this:

Result
-> Gather
  -> Partial Seq Scan

While that's not the end of the world, it seems to needlessly fly in
the face of the general principle that nodes should generally try to
support projection.  So attached is a patch to make Gather
projection-capable (gather-project.patch).  It has a slight dependency
on my patch to fix up the tqueue machinery for record types, so I've
attached that patch here as well (tqueue-record-types.patch).

Comments?  Reviews?

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
commit d17bac203638c0d74696602069693aa41dea1894
Author: Robert Haas <rhaas@postgresql.org>
Date:   Wed Oct 7 12:43:22 2015 -0400

    Modify tqueue infrastructure to support transient record types.
    
    Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
    mechanism, failed to account for the fact that the RECORD pseudo-type
    uses transient typmods that are only meaningful within a single
    backend.  Transferring such tuples without modification between two
    cooperating backends does not work.  This commit installs a system
    for passing the tuple descriptors over the same shm_mq being used to
    send the tuples themselves.  The two sides might not assign the same
    transient typmod to any given tuple descriptor, so we must also
    substitute the appropriate receiver-side typmod for the one used by
    the sender.  That adds some CPU overhead, but still seems better than
    being unable to pass records between cooperating parallel processes.

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 69df9e3..4791320 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -221,6 +221,7 @@ gather_getnext(GatherState *gatherstate)
 
 			/* wait only if local scan is done */
 			tup = TupleQueueFunnelNext(gatherstate->funnel,
+									   slot->tts_tupleDescriptor,
 									   gatherstate->need_to_scan_locally,
 									   &done);
 			if (done)
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 67143d3..53b69e0 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -21,23 +21,55 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "catalog/pg_type.h"
 #include "executor/tqueue.h"
+#include "funcapi.h"
+#include "lib/stringinfo.h"
 #include "miscadmin.h"
+#include "utils/array.h"
+#include "utils/memutils.h"
+#include "utils/typcache.h"
 
 typedef struct
 {
 	DestReceiver pub;
 	shm_mq_handle *handle;
+	MemoryContext	tmpcontext;
+	HTAB	   *recordhtab;
+	char		mode;
 }	TQueueDestReceiver;
 
+typedef struct RecordTypemodMap
+{
+	int			remotetypmod;
+	int			localtypmod;
+} RecordTypemodMap;
+
 struct TupleQueueFunnel
 {
 	int			nqueues;
 	int			maxqueues;
 	int			nextqueue;
 	shm_mq_handle **queue;
+	char	   *mode;
+	HTAB	   *typmodmap;
 };
 
+#define		TUPLE_QUEUE_MODE_CONTROL			'c'
+#define		TUPLE_QUEUE_MODE_DATA				'd'
+
+static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
+static void tqueueWalkRecordArray(TQueueDestReceiver *tqueue, Datum value);
+static void TupleQueueHandleControlMessage(TupleQueueFunnel *funnel,
+			Size nbytes, char *data);
+static HeapTuple TupleQueueHandleDataMessage(TupleQueueFunnel *funnel,
+							TupleDesc tupledesc, Size nbytes,
+							HeapTupleHeader data);
+static HeapTuple TupleQueueRemapTuple(TupleQueueFunnel *funnel,
+					 TupleDesc tupledesc, HeapTuple tuple);
+static Datum TupleQueueRemapRecord(TupleQueueFunnel *funnel, Datum value);
+static Datum TupleQueueRemapRecordArray(TupleQueueFunnel *funnel, Datum value);
+
 /*
  * Receive a tuple.
  */
@@ -46,12 +78,178 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
 	HeapTuple	tuple;
+	HeapTupleHeader tup;
+	AttrNumber	i;
 
 	tuple = ExecMaterializeSlot(slot);
+	tup = tuple->t_data;
+
+	/*
+	 * If any of the columns that we're sending back are records, special
+	 * handling is required, because the tuple descriptors are stored in a
+	 * backend-local cache, and the backend receiving data from us need not
+	 * have the same cache contents we do.  We grovel through the tuple,
+	 * find all the transient record types contained therein, and send
+	 * special control messages through the queue so that the receiving
+	 * process can interpret them correctly.
+	 */
+	for (i = 0; i < slot->tts_tupleDescriptor->natts; ++i)
+	{
+		Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i];
+		MemoryContext	oldcontext;
+
+		/* Ignore nulls and non-records. */
+		if (slot->tts_isnull[i] || (attr->atttypid != RECORDOID
+			&& attr->atttypid != RECORDARRAYOID))
+			continue;
+
+		/*
+		 * OK, we're going to need to examine this attribute.  We could
+		 * use heap_deform_tuple here, but there's a possibility that the
+		 * slot already constains the deconstructed tuple, in which case
+		 * deforming it again would be needlessly inefficient.
+		 */
+		slot_getallattrs(slot);
+
+		/* Switch to temporary memory context to avoid leaking. */
+		if (tqueue->tmpcontext == NULL)
+			tqueue->tmpcontext =
+				AllocSetContextCreate(TopTransactionContext,
+									  "tqueue temporary context",
+									  ALLOCSET_DEFAULT_MINSIZE,
+									  ALLOCSET_DEFAULT_INITSIZE,
+									  ALLOCSET_DEFAULT_MAXSIZE);
+		oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
+		if (attr->atttypid == RECORDOID)
+			tqueueWalkRecord(tqueue, slot->tts_values[i]);
+		else
+			tqueueWalkRecordArray(tqueue, slot->tts_values[i]);
+		MemoryContextSwitchTo(oldcontext);
+
+		/* Clean up anything memory we allocated. */
+		MemoryContextReset(tqueue->tmpcontext);
+	}
+
+	/* If we entered control mode, switch back to data mode. */
+	if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
+	{
+		tqueue->mode = TUPLE_QUEUE_MODE_DATA;
+		shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+	}
+
+	/* Send the tuple itself. */
 	shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
 }
 
 /*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
+{
+	HeapTupleHeader	tup;
+	Oid			typmod;
+	bool		found;
+	TupleDesc	tupledesc;
+	Datum	   *values;
+	bool	   *isnull;
+	HeapTupleData	tdata;
+	AttrNumber	i;
+
+	/* Extract typmod from tuple. */
+	tup = DatumGetHeapTupleHeader(value);
+	typmod = HeapTupleHeaderGetTypMod(tup);
+
+	/* Look up tuple descriptor in typecache. */
+	tupledesc = lookup_rowtype_tupdesc(RECORDOID, typmod);
+
+	/* Initialize hash table if not done yet. */
+	if (tqueue->recordhtab == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(int);
+		ctl.hcxt = TopMemoryContext;
+		tqueue->recordhtab = hash_create("tqueue record hashtable",
+										 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Have we already seen this record type?  If not, must report it. */
+	hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
+	if (!found)
+	{
+		StringInfoData	buf;
+
+		/* If message queue is in data mode, switch to control mode. */
+		if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
+		{
+			tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
+			shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+		}
+
+		/* Assemble a control message. */
+		initStringInfo(&buf);
+		appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+		appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
+		appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
+							   sizeof(bool));
+		for (i = 0; i < tupledesc->natts; ++i)
+			appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
+								   sizeof(FormData_pg_attribute));
+
+		/* Send control message. */
+		shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+	}
+
+	/* Deform the tuple so we can check each column within. */
+	values = palloc(tupledesc->natts * sizeof(Datum));
+	isnull = palloc(tupledesc->natts * sizeof(bool));
+	tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
+	ItemPointerSetInvalid(&(tdata.t_self));
+	tdata.t_tableOid = InvalidOid;
+	tdata.t_data = tup;
+	heap_deform_tuple(&tdata, tupledesc, values, isnull);
+
+	/* Recursively check each non-NULL attribute. */
+	for (i = 0; i < tupledesc->natts; ++i)
+	{
+		Form_pg_attribute attr = tupledesc->attrs[i];
+		if (isnull[i])
+			continue;
+		if (attr->atttypid == RECORDOID)
+			tqueueWalkRecord(tqueue, values[i]);
+		if (attr->atttypid == RECORDARRAYOID)
+			tqueueWalkRecordArray(tqueue, values[i]);
+	}
+
+	/* Release reference count acquired by lookup_rowtype_tupdesc. */
+	DecrTupleDescRefCount(tupledesc);
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecordArray(TQueueDestReceiver *tqueue, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	Assert(ARR_ELEMTYPE(arr) == RECORDOID);
+	deconstruct_array(arr, RECORDOID, -1, false, 'd',
+					  &elem_values, &elem_nulls, &num_elems);
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			tqueueWalkRecord(tqueue, elem_values[i]);
+}
+
+/*
  * Prepare to receive tuples from executor.
  */
 static void
@@ -77,6 +275,12 @@ tqueueShutdownReceiver(DestReceiver *self)
 static void
 tqueueDestroyReceiver(DestReceiver *self)
 {
+	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+
+	if (tqueue->tmpcontext != NULL)
+		MemoryContextDelete(tqueue->tmpcontext);
+	if (tqueue->recordhtab != NULL)
+		hash_destroy(tqueue->recordhtab);
 	pfree(self);
 }
 
@@ -96,6 +300,9 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
 	self->pub.rDestroy = tqueueDestroyReceiver;
 	self->pub.mydest = DestTupleQueue;
 	self->handle = handle;
+	self->tmpcontext = NULL;
+	self->recordhtab = NULL;
+	self->mode = TUPLE_QUEUE_MODE_DATA;
 
 	return (DestReceiver *) self;
 }
@@ -110,6 +317,7 @@ CreateTupleQueueFunnel(void)
 
 	funnel->maxqueues = 8;
 	funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+	funnel->mode = palloc(funnel->maxqueues * sizeof(char));
 
 	return funnel;
 }
@@ -125,6 +333,9 @@ DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
 	for (i = 0; i < funnel->nqueues; i++)
 		shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
 	pfree(funnel->queue);
+	pfree(funnel->mode);
+	if (funnel->typmodmap != NULL)
+		hash_destroy(funnel->typmodmap);
 	pfree(funnel);
 }
 
@@ -134,12 +345,6 @@ DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
 void
 RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
 {
-	if (funnel->nqueues < funnel->maxqueues)
-	{
-		funnel->queue[funnel->nqueues++] = handle;
-		return;
-	}
-
 	if (funnel->nqueues >= funnel->maxqueues)
 	{
 		int			newsize = funnel->nqueues * 2;
@@ -148,10 +353,12 @@ RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
 
 		funnel->queue = repalloc(funnel->queue,
 								 newsize * sizeof(shm_mq_handle *));
+		funnel->mode = repalloc(funnel->mode, newsize * sizeof(bool));
 		funnel->maxqueues = newsize;
 	}
 
-	funnel->queue[funnel->nqueues++] = handle;
+	funnel->queue[funnel->nqueues] = handle;
+	funnel->mode[funnel->nqueues++] = TUPLE_QUEUE_MODE_DATA;
 }
 
 /*
@@ -172,7 +379,8 @@ RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
  * any other case.
  */
 HeapTuple
-TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+TupleQueueFunnelNext(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+					 bool nowait, bool *done)
 {
 	int			waitpos = funnel->nextqueue;
 
@@ -190,6 +398,7 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 	for (;;)
 	{
 		shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
+		char	   *modep = &funnel->mode[funnel->nextqueue];
 		shm_mq_result result;
 		Size		nbytes;
 		void	   *data;
@@ -198,15 +407,10 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 		result = shm_mq_receive(mqh, &nbytes, &data, true);
 
 		/*
-		 * Normally, we advance funnel->nextqueue to the next queue at this
-		 * point, but if we're pointing to a queue that we've just discovered
-		 * is detached, then forget that queue and leave the pointer where it
-		 * is until the number of remaining queues fall below that pointer and
-		 * at that point make the pointer point to the first queue.
+		 * If this queue has been detached, forget about it and shift the
+		 * remmaining queues downward in the array.
 		 */
-		if (result != SHM_MQ_DETACHED)
-			funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
-		else
+		if (result == SHM_MQ_DETACHED)
 		{
 			--funnel->nqueues;
 			if (funnel->nqueues == 0)
@@ -230,21 +434,32 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 			continue;
 		}
 
+		/* Advance nextqueue pointer to next queue in round-robin fashion. */
+		funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
+
 		/* If we got a message, return it. */
 		if (result == SHM_MQ_SUCCESS)
 		{
-			HeapTupleData htup;
-
-			/*
-			 * The tuple data we just read from the queue is only valid until
-			 * we again attempt to read from it.  Copy the tuple into a single
-			 * palloc'd chunk as callers will expect.
-			 */
-			ItemPointerSetInvalid(&htup.t_self);
-			htup.t_tableOid = InvalidOid;
-			htup.t_len = nbytes;
-			htup.t_data = data;
-			return heap_copytuple(&htup);
+			if (nbytes == 1)
+			{
+				/* Mode switch message. */
+				*modep = ((char *) data)[0];
+				continue;
+			}
+			else if (*modep == TUPLE_QUEUE_MODE_DATA)
+			{
+				/* Tuple data. */
+				return TupleQueueHandleDataMessage(funnel, tupledesc,
+												   nbytes, data);
+			}
+			else if (*modep == TUPLE_QUEUE_MODE_CONTROL)
+			{
+				/* Control message, describing a transient record type. */
+				TupleQueueHandleControlMessage(funnel, nbytes, data);
+				continue;
+			}
+			else
+				elog(ERROR, "invalid mode: %d", (int) *modep);
 		}
 
 		/*
@@ -262,3 +477,224 @@ TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
 		}
 	}
 }
+
+/*
+ * Handle a data message - that is, a tuple - from the tuple queue funnel.
+ */
+static HeapTuple
+TupleQueueHandleDataMessage(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+							Size nbytes, HeapTupleHeader data)
+{
+	HeapTupleData htup;
+
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = nbytes;
+	htup.t_data = data;
+
+	/* If necessary, remap record typmods. */
+	if (funnel->typmodmap != NULL)
+	{
+		HeapTuple	newtuple;
+
+		newtuple = TupleQueueRemapTuple(funnel, tupledesc, &htup);
+		if (newtuple != NULL)
+			return newtuple;
+	}
+
+	/*
+	 * Otherwise, just copy the tuple into a single palloc'd chunk, as
+	 * callers will expect.
+	 */
+	return heap_copytuple(&htup);
+}
+
+/*
+ * Remap tuple typmods per control information received from remote side.
+ */
+static HeapTuple
+TupleQueueRemapTuple(TupleQueueFunnel *funnel, TupleDesc tupledesc,
+					 HeapTuple tuple)
+{
+	Datum	   *values;
+	bool	   *isnull;
+	bool		dirty = false;
+	int			i;
+
+	/* Deform tuple so we can remap record typmods for individual attrs. */
+	values = palloc(tupledesc->natts * sizeof(Datum));
+	isnull = palloc(tupledesc->natts * sizeof(bool));
+	heap_deform_tuple(tuple, tupledesc, values, isnull);
+
+	/* Recursively check each non-NULL attribute. */
+	for (i = 0; i < tupledesc->natts; ++i)
+	{
+		Form_pg_attribute attr = tupledesc->attrs[i];
+
+		if (isnull[i])
+			continue;
+
+		if (attr->atttypid == RECORDOID)
+		{
+			values[i] = TupleQueueRemapRecord(funnel, values[i]);
+			dirty = true;
+		}
+
+
+		if (attr->atttypid == RECORDARRAYOID)
+		{
+			values[i] = TupleQueueRemapRecordArray(funnel, values[i]);
+			dirty = true;
+		}
+	}
+
+	/* If we didn't need to change anything, just return NULL. */
+	if (!dirty)
+		return NULL;
+
+	/* Reform the modified tuple. */
+	return heap_form_tuple(tupledesc, values, isnull);
+}
+
+static Datum
+TupleQueueRemapRecord(TupleQueueFunnel *funnel, Datum value)
+{
+	HeapTupleHeader	tup;
+	int				remotetypmod;
+	RecordTypemodMap *mapent;
+	TupleDesc		atupledesc;
+	HeapTupleData	htup;
+	HeapTuple		atup;
+
+	tup = DatumGetHeapTupleHeader(value);
+
+	/* Map remote typmod to local typmod and get tupledesc. */
+	remotetypmod = HeapTupleHeaderGetTypMod(tup);
+	Assert(funnel->typmodmap != NULL);
+	mapent = hash_search(funnel->typmodmap, &remotetypmod,
+						 HASH_FIND, NULL);
+	if (mapent == NULL)
+		elog(ERROR, "found unrecognized remote typmod %d",
+			 mapent->remotetypmod);
+	atupledesc = lookup_rowtype_tupdesc(RECORDOID, mapent->localtypmod);
+
+	/* Recursively process contents of record. */
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+	htup.t_data = tup;
+	atup = TupleQueueRemapTuple(funnel, atupledesc, &htup);
+
+	/* Release reference count acquired by lookup_rowtype_tupdesc. */
+	DecrTupleDescRefCount(atupledesc);
+
+	/*
+	 * Even if none of the attributes inside this tuple are records that
+	 * require typmod remapping, we still need to change the typmod on
+	 * the record itself.  However, we can do that by copying the tuple
+	 * rather than reforming it.
+	 */
+	if (atup == NULL)
+	{
+		atup = heap_copytuple(&htup);
+		HeapTupleHeaderSetTypMod(atup->t_data, mapent->localtypmod);
+	}
+
+	return HeapTupleHeaderGetDatum(atup->t_data);
+}
+
+static Datum
+TupleQueueRemapRecordArray(TupleQueueFunnel *funnel, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	Assert(ARR_ELEMTYPE(arr) == RECORDOID);
+	deconstruct_array(arr, RECORDOID, -1, false, 'd',
+					  &elem_values, &elem_nulls, &num_elems);
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			elem_values[i] = TupleQueueRemapRecord(funnel, elem_values[i]);
+	arr = construct_md_array(elem_values, elem_nulls,
+							 ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+							 RECORDOID,
+							 -1, false, 'd');
+	return PointerGetDatum(arr);
+}
+
+/*
+ * Handle a control message from the tuple queue funnel.
+ *
+ * Control messages are sent when the remote side is sending tuples that
+ * contain transient record types.  We need to arrange to bless those
+ * record types locally and translate between remote and local typmods.
+ */
+static void
+TupleQueueHandleControlMessage(TupleQueueFunnel *funnel, Size nbytes,
+							   char *data)
+{
+	int		natts;
+	int		remotetypmod;
+	bool	hasoid;
+	char   *buf = data;
+	int		rc = 0;
+	int		i;
+	Form_pg_attribute *attrs;
+	MemoryContext	oldcontext;
+	TupleDesc	tupledesc;
+	RecordTypemodMap *mapent;
+	bool	found;
+
+	/* Extract remote typmod. */
+	memcpy(&remotetypmod, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract attribute count. */
+	memcpy(&natts, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract hasoid flag. */
+	memcpy(&hasoid, &buf[rc], sizeof(bool));
+	rc += sizeof(bool);
+
+	/* Extract attribute details. */
+	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+	attrs = palloc(natts * sizeof(Form_pg_attribute));
+	for (i = 0; i < natts; ++i)
+	{
+		attrs[i] = palloc(sizeof(FormData_pg_attribute));
+		memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
+		rc += sizeof(FormData_pg_attribute);
+	}
+	MemoryContextSwitchTo(oldcontext);
+
+	/* We should have read the whole message. */
+	Assert(rc == nbytes);
+
+	/* Construct TupleDesc. */
+	tupledesc = CreateTupleDesc(natts, hasoid, attrs);
+	tupledesc = BlessTupleDesc(tupledesc);
+
+	/* Create map if it doesn't exist already. */
+	if (funnel->typmodmap == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(RecordTypemodMap);
+		ctl.hcxt = CurTransactionContext;
+		funnel->typmodmap = hash_create("typmodmap hashtable",
+							 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Create map entry. */
+	mapent = hash_search(funnel->typmodmap, &remotetypmod, HASH_ENTER,
+						 &found);
+	if (found)
+		elog(ERROR, "duplicate message for typmod %d",
+			 remotetypmod);
+	mapent->localtypmod = tupledesc->tdtypmod;
+}
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 6f8eb73..59f35c7 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -25,7 +25,7 @@ typedef struct TupleQueueFunnel TupleQueueFunnel;
 extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
 extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
 extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
-extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
-					 bool *done);
+extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, TupleDesc tupledesc,
+					 bool nowait, bool *done);
 
 #endif   /* TQUEUE_H */
commit da52bc825554ea7937398b4b296f3ecd6e6822af
Author: Robert Haas <rhaas@postgresql.org>
Date:   Tue Oct 20 21:47:18 2015 -0400

    Make Gather node projection-capable.
    
    The original Gather code failed to mark a Gather node as not able to
    do projection, but it couldn't, even though it did call initialize its
    projection info via ExecAssignProjectionInfo.  There doesn't seem to
    be any good reason for this node not to have projection capability,
    so clean things up so that it does.  Without this, plans using Gather
    nodes need to carry extra Result nodes to do projection.

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 4791320..48d6c31 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,6 +36,7 @@
 #include "executor/nodeGather.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 
 
@@ -50,6 +51,9 @@ GatherState *
 ExecInitGather(Gather *node, EState *estate, int eflags)
 {
 	GatherState *gatherstate;
+	Plan	   *outerNode;
+	bool		hasoid;
+	TupleDesc	tupDesc;
 
 	/* Gather node doesn't have innerPlan node. */
 	Assert(innerPlan(node) == NULL);
@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	/*
 	 * tuple table initialization
 	 */
+	gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
 	ExecInitResultTupleSlot(estate, &gatherstate->ps);
 
 	/*
 	 * now initialize outer plan
 	 */
-	outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags);
-
+	outerNode = outerPlan(node);
+	outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
 
 	gatherstate->ps.ps_TupFromTlist = false;
 
@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	ExecAssignResultTypeFromTL(&gatherstate->ps);
 	ExecAssignProjectionInfo(&gatherstate->ps, NULL);
 
+	/*
+	 * Initialize funnel slot to same tuple descriptor as outer plan.
+	 */
+	if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
+		hasoid = false;
+	tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
+	ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
+
 	return gatherstate;
 }
 
@@ -113,6 +126,9 @@ ExecGather(GatherState *node)
 {
 	int			i;
 	TupleTableSlot *slot;
+	TupleTableSlot *resultSlot;
+	ExprDoneCond isDone;
+	ExprContext *econtext;
 
 	/*
 	 * Initialize the parallel context and workers on first execution. We do
@@ -169,7 +185,53 @@ ExecGather(GatherState *node)
 		node->initialized = true;
 	}
 
-	slot = gather_getnext(node);
+	/*
+	 * Check to see if we're still projecting out tuples from a previous scan
+	 * tuple (because there is a function-returning-set in the projection
+	 * expressions).  If so, try to project another one.
+	 */
+	if (node->ps.ps_TupFromTlist)
+	{
+		resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+		if (isDone == ExprMultipleResult)
+			return resultSlot;
+		/* Done with that source tuple... */
+		node->ps.ps_TupFromTlist = false;
+	}
+
+	/*
+	 * Reset per-tuple memory context to free any expression evaluation
+	 * storage allocated in the previous tuple cycle.  Note we can't do this
+	 * until we're done projecting.
+	 */
+	econtext = node->ps.ps_ExprContext;
+	ResetExprContext(econtext);
+
+	/* Get and return the next tuple, projecting if necessary. */
+	for (;;)
+	{
+		/*
+		 * Get next tuple, either from one of our workers, or by running the
+		 * plan ourselves.
+		 */
+		slot = gather_getnext(node);
+		if (TupIsNull(slot))
+			return NULL;
+
+		/*
+		 * form the result tuple using ExecProject(), and return it --- unless
+		 * the projection produces an empty set, in which case we must loop
+		 * back around for another tuple
+		 */
+		econtext->ecxt_outertuple = slot;
+		resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
+
+		if (isDone != ExprEndResult)
+		{
+			node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
+			return resultSlot;
+		}
+	}
 
 	return slot;
 }
@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node)
 static TupleTableSlot *
 gather_getnext(GatherState *gatherstate)
 {
-	PlanState  *outerPlan;
+	PlanState  *outerPlan = outerPlanState(gatherstate);
 	TupleTableSlot *outerTupleSlot;
-	TupleTableSlot *slot;
+	TupleTableSlot *fslot = gatherstate->funnel_slot;
 	HeapTuple	tup;
 
-	/*
-	 * We can use projection info of Gather for the tuples received from
-	 * worker backends as currently for all cases worker backends sends the
-	 * projected tuple as required by Gather node.
-	 */
-	slot = gatherstate->ps.ps_ProjInfo->pi_slot;
-
 	while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
 	{
 		if (gatherstate->funnel != NULL)
@@ -221,7 +276,7 @@ gather_getnext(GatherState *gatherstate)
 
 			/* wait only if local scan is done */
 			tup = TupleQueueFunnelNext(gatherstate->funnel,
-									   slot->tts_tupleDescriptor,
+									   fslot->tts_tupleDescriptor,
 									   gatherstate->need_to_scan_locally,
 									   &done);
 			if (done)
@@ -230,19 +285,17 @@ gather_getnext(GatherState *gatherstate)
 			if (HeapTupleIsValid(tup))
 			{
 				ExecStoreTuple(tup,		/* tuple to store */
-							   slot,	/* slot to store in */
+							   fslot,	/* slot in which to store the tuple */
 							   InvalidBuffer,	/* buffer associated with this
 												 * tuple */
 							   true);	/* pfree this pointer if not from heap */
 
-				return slot;
+				return fslot;
 			}
 		}
 
 		if (gatherstate->need_to_scan_locally)
 		{
-			outerPlan = outerPlanState(gatherstate);
-
 			outerTupleSlot = ExecProcNode(outerPlan);
 
 			if (!TupIsNull(outerTupleSlot))
@@ -252,7 +305,7 @@ gather_getnext(GatherState *gatherstate)
 		}
 	}
 
-	return ExecClearTuple(slot);
+	return ExecClearTuple(fslot);
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 8c6c571..48d6e6f 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 			set_join_references(root, (Join *) plan, rtoffset);
 			break;
 
+		case T_Gather:
+			set_upper_references(root, plan, rtoffset);
+			break;
+
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
 		case T_Unique:
 		case T_SetOp:
-		case T_Gather:
 
 			/*
 			 * These plan types don't actually bother to evaluate their
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4fcdcc4..939bc0e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1964,6 +1964,7 @@ typedef struct GatherState
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
 	struct TupleQueueFunnel *funnel;
+	TupleTableSlot *funnel_slot;
 	bool		need_to_scan_locally;
 } GatherState;
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to