On Wed, Oct 28, 2015 at 10:23 AM, Robert Haas <robertmh...@gmail.com> wrote:
> On Sun, Oct 18, 2015 at 12:17 AM, Robert Haas <robertmh...@gmail.com> wrote:
>>> So reviewing patch 13 isn't possible without prior knowledge.
>>
>> The basic question for patch 13 is whether ephemeral record types can
>> occur in executor tuples in any contexts that I haven't identified.  I
>> know that a tuple table slot can contain have a column that is of type
>> record or record[], and those records can themselves contain
>> attributes of type record or record[], and so on as far down as you
>> like.  I *think* that's the only case.  For example, I don't believe
>> that a TupleTableSlot can contain a *named* record type that has an
>> anonymous record buried down inside of it somehow.  But I'm not
>> positive I'm right about that.
>
> I have done some more testing and investigation and determined that
> this optimism was unwarranted.  It turns out that the type information
> for composite and record types gets stored in two different places.
> First, the TupleTableSlot has a type OID, indicating the sort of the
> value it expects to be stored for that slot attribute.  Second, the
> value itself contains a type OID and typmod.  And these don't have to
> match.  For example, consider this query:
>
> select row_to_json(i) from int8_tbl i(x,y);
>
> Without i(x,y), the HeapTuple passed to row_to_json is labelled with
> the pg_type OID of int8_tbl.  But with the query as written, it's
> labeled as an anonymous record type.  If I jigger things by hacking
> the code so that this is planned as Gather (single-copy) -> SeqScan,
> with row_to_json evaluated at the Gather node, then the sequential
> scan kicks out a tuple with a transient record type and stores it into
> a slot whose type OID is still that of int8_tbl.  My previous patch
> failed to deal with that; the attached one does.
>
> The previous patch was also defective in a few other respects.  The
> most significant of those, maybe, is that it somehow thought it was OK
> to assume that transient typmods from all workers could be treated
> interchangeably rather than individually.  To fix this, I've changed
> the TupleQueueFunnel implemented by tqueue.c to be merely a
> TupleQueueReader which handles reading from a single worker only.
> nodeGather.c therefore creates one TupleQueueReader per worker instead
> of a single TupleQueueFunnel for all workers; accordingly, the logic
> for multiplexing multiple queues now lives in nodeGather.c.  This is
> probably how I should have done it originally - someone, I think Jeff
> Davis - complained previously that tqueue.c had no business embedding
> the round-robin policy decision, and he was right.  So this addresses
> that complaint as well.

Here is an updated version.  This is rebased over recent commits, and
I added a missing check for attisdropped.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
From fa31300a884cc942d22c66d6a30fa4c2fcba3c6f Mon Sep 17 00:00:00 2001
From: Robert Haas <rh...@postgresql.org>
Date: Wed, 7 Oct 2015 12:43:22 -0400
Subject: [PATCH 5/5] Modify tqueue infrastructure to support transient record
 types.

Commit 4a4e6893aa080b9094dadbe0e65f8a75fee41ac6, which introduced this
mechanism, failed to account for the fact that the RECORD pseudo-type
uses transient typmods that are only meaningful within a single
backend.  Transferring such tuples without modification between two
cooperating backends does not work.  This commit installs a system
for passing the tuple descriptors over the same shm_mq being used to
send the tuples themselves.  The two sides might not assign the same
transient typmod to any given tuple descriptor, so we must also
substitute the appropriate receiver-side typmod for the one used by
the sender.  That adds some CPU overhead, but still seems better than
being unable to pass records between cooperating parallel processes.

Along the way, move the logic for handling multiple tuple queues from
tqueue.c to nodeGather.c; tqueue.c now provides a TupleQueueReader,
which reads from a single queue, rather than a TupleQueueFunnel, which
potentially reads from multiple queues.  This change was suggested
previously as a way to make sure that nodeGather.c rather than tqueue.c
had policy control over the order in which to read from queues, but
it wasn't clear to me until now how good an idea it was.  typmod
mapping needs to be performed separately for each queue, and it is
much simpler if the tqueue.c code handles that and leaves multiplexing
multiple queues to higher layers of the stack.
---
 src/backend/executor/nodeGather.c | 138 ++++--
 src/backend/executor/tqueue.c     | 983 +++++++++++++++++++++++++++++++++-----
 src/include/executor/tqueue.h     |  12 +-
 src/include/nodes/execnodes.h     |   4 +-
 src/tools/pgindent/typedefs.list  |   2 +-
 5 files changed, 986 insertions(+), 153 deletions(-)

diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 5f58961..850c67e 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,11 +36,13 @@
 #include "executor/nodeGather.h"
 #include "executor/nodeSubplan.h"
 #include "executor/tqueue.h"
+#include "miscadmin.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
 
 static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static HeapTuple gather_readnext(GatherState *gatherstate);
 static void ExecShutdownGatherWorkers(GatherState *node);
 
 
@@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 TupleTableSlot *
 ExecGather(GatherState *node)
 {
+	TupleTableSlot *fslot = node->funnel_slot;
 	int			i;
 	TupleTableSlot *slot;
 	TupleTableSlot *resultSlot;
@@ -148,6 +151,7 @@ ExecGather(GatherState *node)
 		 */
 		if (gather->num_workers > 0 && IsInParallelMode())
 		{
+			ParallelContext *pcxt;
 			bool	got_any_worker = false;
 
 			/* Initialize the workers required to execute Gather node. */
@@ -160,18 +164,26 @@ ExecGather(GatherState *node)
 			 * Register backend workers. We might not get as many as we
 			 * requested, or indeed any at all.
 			 */
-			LaunchParallelWorkers(node->pei->pcxt);
+			pcxt = node->pei->pcxt;
+			LaunchParallelWorkers(pcxt);
 
-			/* Set up a tuple queue to collect the results. */
-			node->funnel = CreateTupleQueueFunnel();
-			for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+			/* Set up tuple queue readers to read the results. */
+			if (pcxt->nworkers > 0)
 			{
-				if (node->pei->pcxt->worker[i].bgwhandle)
+				node->nreaders = 0;
+				node->reader =
+					palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+				for (i = 0; i < pcxt->nworkers; ++i)
 				{
+					if (pcxt->worker[i].bgwhandle == NULL)
+						continue;
+
 					shm_mq_set_handle(node->pei->tqueue[i],
-									  node->pei->pcxt->worker[i].bgwhandle);
-					RegisterTupleQueueOnFunnel(node->funnel,
-											   node->pei->tqueue[i]);
+									  pcxt->worker[i].bgwhandle);
+					node->reader[node->nreaders++] =
+						CreateTupleQueueReader(node->pei->tqueue[i],
+											   fslot->tts_tupleDescriptor);
 					got_any_worker = true;
 				}
 			}
@@ -182,7 +194,7 @@ ExecGather(GatherState *node)
 		}
 
 		/* Run plan locally if no workers or not single-copy. */
-		node->need_to_scan_locally = (node->funnel == NULL)
+		node->need_to_scan_locally = (node->reader == NULL)
 			|| !gather->single_copy;
 		node->initialized = true;
 	}
@@ -254,13 +266,9 @@ ExecEndGather(GatherState *node)
 }
 
 /*
- * gather_getnext
- *
- * Get the next tuple from shared memory queue.  This function
- * is responsible for fetching tuples from all the queues associated
- * with worker backends used in Gather node execution and if there is
- * no data available from queues or no worker is available, it does
- * fetch the data from local node.
+ * Read the next tuple.  We might fetch a tuple from one of the tuple queues
+ * using gather_readnext, or if no tuple queue contains a tuple and the
+ * single_copy flag is not set, we might generate one locally instead.
  */
 static TupleTableSlot *
 gather_getnext(GatherState *gatherstate)
@@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate)
 	TupleTableSlot *fslot = gatherstate->funnel_slot;
 	HeapTuple	tup;
 
-	while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
+	while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
 	{
-		if (gatherstate->funnel != NULL)
+		if (gatherstate->reader != NULL)
 		{
-			bool		done = false;
-
-			/* wait only if local scan is done */
-			tup = TupleQueueFunnelNext(gatherstate->funnel,
-									   gatherstate->need_to_scan_locally,
-									   &done);
-			if (done)
-				ExecShutdownGatherWorkers(gatherstate);
+			tup = gather_readnext(gatherstate);
 
 			if (HeapTupleIsValid(tup))
 			{
@@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate)
 	return ExecClearTuple(fslot);
 }
 
+/*
+ * Attempt to read a tuple from one of our parallel workers.
+ */
+static HeapTuple
+gather_readnext(GatherState *gatherstate)
+{
+	int		waitpos = gatherstate->nextreader;
+
+	for (;;)
+	{
+		TupleQueueReader *reader;
+		HeapTuple	tup;
+		bool		readerdone;
+
+		/* Make sure we've read all messages from workers. */
+		HandleParallelMessages();
+
+		/* Attempt to read a tuple, but don't block if none is available. */
+		reader = gatherstate->reader[gatherstate->nextreader];
+		tup = TupleQueueReaderNext(reader, true, &readerdone);
+
+		/*
+		 * If this reader is done, remove it.  If all readers are done,
+		 * clean up remaining worker state.
+		 */
+		if (readerdone)
+		{
+			DestroyTupleQueueReader(reader);
+			--gatherstate->nreaders;
+			if (gatherstate->nreaders == 0)
+			{
+				ExecShutdownGather(gatherstate);
+				return NULL;
+			}
+			else
+			{
+				memmove(&gatherstate->reader[gatherstate->nextreader],
+						&gatherstate->reader[gatherstate->nextreader + 1],
+						sizeof(TupleQueueReader *)
+						* (gatherstate->nreaders - gatherstate->nextreader));
+				if (gatherstate->nextreader >= gatherstate->nreaders)
+					gatherstate->nextreader = 0;
+				if (gatherstate->nextreader < waitpos)
+					--waitpos;
+			}
+			continue;
+		}
+
+		/* Advance nextreader pointer in round-robin fashion. */
+		gatherstate->nextreader =
+			(gatherstate->nextreader + 1) % gatherstate->nreaders;
+
+		/* If we got a tuple, return it. */
+		if (tup)
+			return tup;
+
+		/* Have we visited every TupleQueueReader? */
+		if (gatherstate->nextreader == waitpos)
+		{
+			/*
+			 * If (still) running plan locally, return NULL so caller can
+			 * generate another tuple from the local copy of the plan.
+			 */
+			if (gatherstate->need_to_scan_locally)
+				return NULL;
+
+			/* Nothing to do except wait for developments. */
+			WaitLatch(MyLatch, WL_LATCH_SET, 0);
+			CHECK_FOR_INTERRUPTS();
+			ResetLatch(MyLatch);
+		}
+	}
+}
+
 /* ----------------------------------------------------------------
  *		ExecShutdownGatherWorkers
  *
@@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate)
 void
 ExecShutdownGatherWorkers(GatherState *node)
 {
-	/* Shut down tuple queue funnel before shutting down workers. */
-	if (node->funnel != NULL)
+	/* Shut down tuple queue readers before shutting down workers. */
+	if (node->reader != NULL)
 	{
-		DestroyTupleQueueFunnel(node->funnel);
-		node->funnel = NULL;
+		int		i;
+
+		for (i = 0; i < node->nreaders; ++i)
+			DestroyTupleQueueReader(node->reader[i]);
+		node->reader = NULL;
 	}
 
 	/* Now shut down the workers. */
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 67143d3..f465b1d 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -4,10 +4,15 @@
  *	  Use shm_mq to send & receive tuples between parallel backends
  *
  * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
- * under the hood, writes tuples from the executor to a shm_mq.
+ * under the hood, writes tuples from the executor to a shm_mq.  If
+ * necessary, it also writes control messages describing transient
+ * record types used within the tuple.
  *
- * A TupleQueueFunnel helps manage the process of reading tuples from
- * one or more shm_mq objects being used as tuple queues.
+ * A TupleQueueReader reads tuples, and if any are sent control messages,
+ * from a shm_mq and returns the tuples.  If transient record types are
+ * in use, it registers those types based on the received control messages
+ * and rewrites the typemods sent by the remote side to the corresponding
+ * local record typemods.
  *
  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,37 +26,404 @@
 #include "postgres.h"
 
 #include "access/htup_details.h"
+#include "catalog/pg_type.h"
 #include "executor/tqueue.h"
+#include "funcapi.h"
+#include "lib/stringinfo.h"
 #include "miscadmin.h"
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/rangetypes.h"
+#include "utils/syscache.h"
+#include "utils/typcache.h"
+
+typedef enum
+{
+	TQUEUE_REMAP_NONE,			/* no special processing required */
+	TQUEUE_REMAP_ARRAY,			/* array */
+	TQUEUE_REMAP_RANGE,			/* range */
+	TQUEUE_REMAP_RECORD			/* composite type, named or anonymous */
+}	RemapClass;
+
+typedef struct
+{
+	int			natts;
+	RemapClass	mapping[FLEXIBLE_ARRAY_MEMBER];
+}	RemapInfo;
 
 typedef struct
 {
 	DestReceiver pub;
 	shm_mq_handle *handle;
+	MemoryContext tmpcontext;
+	HTAB	   *recordhtab;
+	char		mode;
+	TupleDesc	tupledesc;
+	RemapInfo  *remapinfo;
 }	TQueueDestReceiver;
 
-struct TupleQueueFunnel
+typedef struct RecordTypemodMap
 {
-	int			nqueues;
-	int			maxqueues;
-	int			nextqueue;
-	shm_mq_handle **queue;
+	int			remotetypmod;
+	int			localtypmod;
+}	RecordTypemodMap;
+
+struct TupleQueueReader
+{
+	shm_mq_handle *queue;
+	char		mode;
+	TupleDesc	tupledesc;
+	RemapInfo  *remapinfo;
+	HTAB	   *typmodmap;
 };
 
+#define		TUPLE_QUEUE_MODE_CONTROL			'c'
+#define		TUPLE_QUEUE_MODE_DATA				'd'
+
+static void tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype,
+		   Datum value);
+static void tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value);
+static void tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod,
+					 TupleDesc tupledesc);
+static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
+							   Size nbytes, char *data);
+static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
+							Size nbytes, HeapTupleHeader data);
+static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
+					 TupleDesc tupledesc, RemapInfo * remapinfo,
+					 HeapTuple tuple);
+static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
+				Datum value);
+static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
+static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
+static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
+static RemapClass GetRemapClass(Oid typeid);
+static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
+
 /*
  * Receive a tuple.
+ *
+ * This is, at core, pretty simple: just send the tuple to the designated
+ * shm_mq.  The complicated part is that if the tuple contains transient
+ * record types (see lookup_rowtype_tupdesc), we need to send control
+ * information to the shm_mq receiver so that those typemods can be correctly
+ * interpreted, as they are merely held in a backend-local cache.  Worse, the
+ * record type may not at the top level: we could have a range over an array
+ * type over a range type over a range type over an array type over a record,
+ * or something like that.
  */
 static void
 tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 {
 	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+	TupleDesc	tupledesc = slot->tts_tupleDescriptor;
 	HeapTuple	tuple;
+	HeapTupleHeader tup;
+
+	/*
+	 * Test to see whether the tupledesc has changed; if so, set up for the
+	 * new tupledesc.  This is a strange test both because the executor really
+	 * shouldn't change the tupledesc, and also because it would be unsafe if
+	 * the old tupledesc could be freed and a new one allocated at the same
+	 * address.  But since some very old code in printtup.c uses this test, we
+	 * adopt it here as well.
+	 */
+	if (tqueue->tupledesc != tupledesc ||
+		tqueue->remapinfo->natts != tupledesc->natts)
+	{
+		if (tqueue->remapinfo != NULL)
+			pfree(tqueue->remapinfo);
+		tqueue->remapinfo = BuildRemapInfo(tupledesc);
+	}
 
 	tuple = ExecMaterializeSlot(slot);
+	tup = tuple->t_data;
+
+	/*
+	 * When, because of the types being transmitted, no record typemod mapping
+	 * can be needed, we can skip a good deal of work.
+	 */
+	if (tqueue->remapinfo != NULL)
+	{
+		RemapInfo  *remapinfo = tqueue->remapinfo;
+		AttrNumber	i;
+		MemoryContext oldcontext = NULL;
+
+		/* Deform the tuple so we can examine it, if not done already. */
+		slot_getallattrs(slot);
+
+		/* Iterate over each attribute and search it for transient typemods. */
+		Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
+		for (i = 0; i < remapinfo->natts; ++i)
+		{
+			/* Ignore nulls and types that don't need special handling. */
+			if (slot->tts_isnull[i] ||
+				remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+				continue;
+
+			/* Switch to temporary memory context to avoid leaking. */
+			if (oldcontext == NULL)
+			{
+				if (tqueue->tmpcontext == NULL)
+					tqueue->tmpcontext =
+						AllocSetContextCreate(TopMemoryContext,
+											  "tqueue temporary context",
+											  ALLOCSET_DEFAULT_MINSIZE,
+											  ALLOCSET_DEFAULT_INITSIZE,
+											  ALLOCSET_DEFAULT_MAXSIZE);
+				oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
+			}
+
+			/* Invoke the appropriate walker function. */
+			tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
+		}
+
+		/* If we used the temp context, reset it and restore prior context. */
+		if (oldcontext != NULL)
+		{
+			MemoryContextSwitchTo(oldcontext);
+			MemoryContextReset(tqueue->tmpcontext);
+		}
+
+		/* If we entered control mode, switch back to data mode. */
+		if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
+		{
+			tqueue->mode = TUPLE_QUEUE_MODE_DATA;
+			shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+		}
+	}
+
+	/* Send the tuple itself. */
 	shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
 }
 
 /*
+ * Invoke the appropriate walker function based on the given RemapClass.
+ */
+static void
+tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype, Datum value)
+{
+	check_stack_depth();
+
+	switch (walktype)
+	{
+		case TQUEUE_REMAP_NONE:
+			break;
+		case TQUEUE_REMAP_ARRAY:
+			tqueueWalkArray(tqueue, value);
+			break;
+		case TQUEUE_REMAP_RANGE:
+			tqueueWalkRange(tqueue, value);
+			break;
+		case TQUEUE_REMAP_RECORD:
+			tqueueWalkRecord(tqueue, value);
+			break;
+	}
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value)
+{
+	HeapTupleHeader tup;
+	Oid			typeid;
+	Oid			typmod;
+	TupleDesc	tupledesc;
+	RemapInfo  *remapinfo;
+
+	/* Extract typmod from tuple. */
+	tup = DatumGetHeapTupleHeader(value);
+	typeid = HeapTupleHeaderGetTypeId(tup);
+	typmod = HeapTupleHeaderGetTypMod(tup);
+
+	/* Look up tuple descriptor in typecache. */
+	tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
+
+	/*
+	 * If this is a transient record time, send its TupleDesc as a control
+	 * message.  (tqueueSendTypemodInfo is smart enough to do this only once
+	 * per typmod.)
+	 */
+	if (typeid == RECORDOID)
+		tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
+
+	/*
+	 * Build the remap information for this tupledesc.  We might want to think
+	 * about keeping a cache of this information keyed by typeid and typemod,
+	 * but let's keep it simple for now.
+	 */
+	remapinfo = BuildRemapInfo(tupledesc);
+
+	/*
+	 * If remapping is required, deform the tuple and process each field. When
+	 * BuildRemapInfo is null, the data types are such that there can be no
+	 * transient record types here, so we can skip all this work.
+	 */
+	if (remapinfo != NULL)
+	{
+		Datum	   *values;
+		bool	   *isnull;
+		HeapTupleData tdata;
+		AttrNumber	i;
+
+		/* Deform the tuple so we can check each column within. */
+		values = palloc(tupledesc->natts * sizeof(Datum));
+		isnull = palloc(tupledesc->natts * sizeof(bool));
+		tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
+		ItemPointerSetInvalid(&(tdata.t_self));
+		tdata.t_tableOid = InvalidOid;
+		tdata.t_data = tup;
+		heap_deform_tuple(&tdata, tupledesc, values, isnull);
+
+		/* Recursively check each non-NULL attribute. */
+		for (i = 0; i < tupledesc->natts; ++i)
+			if (!isnull[i])
+				tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
+	}
+
+	/* Release reference count acquired by lookup_rowtype_tupdesc. */
+	DecrTupleDescRefCount(tupledesc);
+}
+
+/*
+ * Walk a record and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Oid			typeid = ARR_ELEMTYPE(arr);
+	RemapClass	remapclass;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	remapclass = GetRemapClass(typeid);
+
+	/*
+	 * If the elements of the array don't need to be walked, we shouldn't have
+	 * been called in the first place: GetRemapClass should have returned NULL
+	 * when asked about this array type.
+	 */
+	Assert(remapclass != TQUEUE_REMAP_NONE);
+
+	/* Deconstruct the array. */
+	get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
+	deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+					  &elem_values, &elem_nulls, &num_elems);
+
+	/* Walk each element. */
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			tqueueWalk(tqueue, remapclass, elem_values[i]);
+}
+
+/*
+ * Walk a range type and send control messages for transient record types
+ * contained therein.
+ */
+static void
+tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value)
+{
+	RangeType  *range = DatumGetRangeType(value);
+	Oid			typeid = RangeTypeGetOid(range);
+	RemapClass	remapclass;
+	TypeCacheEntry *typcache;
+	RangeBound	lower;
+	RangeBound	upper;
+	bool		empty;
+
+	/*
+	 * Extract the lower and upper bounds.  It might be worth implementing
+	 * some caching scheme here so that we don't look up the same typeids in
+	 * the type cache repeatedly, but for now let's keep it simple.
+	 */
+	typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
+	if (typcache->rngelemtype == NULL)
+		elog(ERROR, "type %u is not a range type", typeid);
+	range_deserialize(typcache, range, &lower, &upper, &empty);
+
+	/* Nothing to do for an empty range. */
+	if (empty)
+		return;
+
+	/*
+	 * If the range bounds don't need to be walked, we shouldn't have been
+	 * called in the first place: GetRemapClass should have returned NULL when
+	 * asked about this range type.
+	 */
+	remapclass = GetRemapClass(typeid);
+	Assert(remapclass != TQUEUE_REMAP_NONE);
+
+	/* Walk each bound, if present. */
+	if (!upper.infinite)
+		tqueueWalk(tqueue, remapclass, upper.val);
+	if (!lower.infinite)
+		tqueueWalk(tqueue, remapclass, lower.val);
+}
+
+/*
+ * Send tuple descriptor information for a transient typemod, unless we've
+ * already done so previously.
+ */
+static void
+tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod,
+					 TupleDesc tupledesc)
+{
+	StringInfoData buf;
+	bool		found;
+	AttrNumber	i;
+
+	/* Initialize hash table if not done yet. */
+	if (tqueue->recordhtab == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(int);
+		ctl.hcxt = TopMemoryContext;
+		tqueue->recordhtab = hash_create("tqueue record hashtable",
+										 100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Have we already seen this record type?  If not, must report it. */
+	hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
+	if (found)
+		return;
+
+	/* If message queue is in data mode, switch to control mode. */
+	if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
+	{
+		tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
+		shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
+	}
+
+	/* Assemble a control message. */
+	initStringInfo(&buf);
+	appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
+	appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
+	appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
+						   sizeof(bool));
+	for (i = 0; i < tupledesc->natts; ++i)
+		appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
+							   sizeof(FormData_pg_attribute));
+
+	/* Send control message. */
+	shm_mq_send(tqueue->handle, buf.len, buf.data, false);
+}
+
+/*
  * Prepare to receive tuples from executor.
  */
 static void
@@ -77,6 +449,14 @@ tqueueShutdownReceiver(DestReceiver *self)
 static void
 tqueueDestroyReceiver(DestReceiver *self)
 {
+	TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
+
+	if (tqueue->tmpcontext != NULL)
+		MemoryContextDelete(tqueue->tmpcontext);
+	if (tqueue->recordhtab != NULL)
+		hash_destroy(tqueue->recordhtab);
+	if (tqueue->remapinfo != NULL)
+		pfree(tqueue->remapinfo);
 	pfree(self);
 }
 
@@ -96,169 +476,542 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle)
 	self->pub.rDestroy = tqueueDestroyReceiver;
 	self->pub.mydest = DestTupleQueue;
 	self->handle = handle;
+	self->tmpcontext = NULL;
+	self->recordhtab = NULL;
+	self->mode = TUPLE_QUEUE_MODE_DATA;
+	self->remapinfo = NULL;
 
 	return (DestReceiver *) self;
 }
 
 /*
- * Create a tuple queue funnel.
+ * Create a tuple queue reader.
  */
-TupleQueueFunnel *
-CreateTupleQueueFunnel(void)
+TupleQueueReader *
+CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
 {
-	TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
+	TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
 
-	funnel->maxqueues = 8;
-	funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
+	reader->queue = handle;
+	reader->mode = TUPLE_QUEUE_MODE_DATA;
+	reader->tupledesc = tupledesc;
+	reader->remapinfo = BuildRemapInfo(tupledesc);
 
-	return funnel;
+	return reader;
 }
 
 /*
- * Destroy a tuple queue funnel.
+ * Destroy a tuple queue reader.
  */
 void
-DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
+DestroyTupleQueueReader(TupleQueueReader *reader)
 {
-	int			i;
+	shm_mq_detach(shm_mq_get_queue(reader->queue));
+	if (reader->remapinfo != NULL)
+		pfree(reader->remapinfo);
+	pfree(reader);
+}
+
+/*
+ * Fetch a tuple from a tuple queue reader.
+ *
+ * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
+ * accumulate bytes from a partially-read message, so it's useful to call
+ * this with nowait = true even if nothing is returned.
+ *
+ * The return value is NULL if there are no remaining queues or if
+ * nowait = true and no tuple is ready to return.  *done, if not NULL,
+ * is set to true when queue is detached and otherwise to false.
+ */
+HeapTuple
+TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
+{
+	shm_mq_result result;
+
+	if (done != NULL)
+		*done = false;
+
+	for (;;)
+	{
+		Size		nbytes;
+		void	   *data;
+
+		/* Attempt to read a message. */
+		result = shm_mq_receive(reader->queue, &nbytes, &data, true);
+
+		/* If queue is detached, set *done and return NULL. */
+		if (result == SHM_MQ_DETACHED)
+		{
+			if (done != NULL)
+				*done = true;
+			return NULL;
+		}
+
+		/* In non-blocking mode, bail out if no message ready yet. */
+		if (result == SHM_MQ_WOULD_BLOCK)
+			return NULL;
+		Assert(result == SHM_MQ_SUCCESS);
 
-	for (i = 0; i < funnel->nqueues; i++)
-		shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
-	pfree(funnel->queue);
-	pfree(funnel);
+		/*
+		 * OK, we got a message.  Process it.
+		 *
+		 * One-byte messages are mode switch messages, so that we can switch
+		 * between "control" and "data" mode.  When in "data" mode, each
+		 * message (unless exactly one byte) is a tuple.  When in "control"
+		 * mode, each message provides a transient-typmod-to-tupledesc mapping
+		 * so we can interpret future tuples.
+		 */
+		if (nbytes == 1)
+		{
+			/* Mode switch message. */
+			reader->mode = ((char *) data)[0];
+		}
+		else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
+		{
+			/* Tuple data. */
+			return TupleQueueHandleDataMessage(reader, nbytes, data);
+		}
+		else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
+		{
+			/* Control message, describing a transient record type. */
+			TupleQueueHandleControlMessage(reader, nbytes, data);
+		}
+		else
+			elog(ERROR, "invalid mode: %d", (int) reader->mode);
+	}
 }
 
 /*
- * Remember the shared memory queue handle in funnel.
+ * Handle a data message - that is, a tuple - from the remote side.
  */
-void
-RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
+static HeapTuple
+TupleQueueHandleDataMessage(TupleQueueReader *reader,
+							Size nbytes,
+							HeapTupleHeader data)
 {
-	if (funnel->nqueues < funnel->maxqueues)
+	HeapTupleData htup;
+
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = nbytes;
+	htup.t_data = data;
+
+	return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
+								&htup);
+}
+
+/*
+ * Remap tuple typmods per control information received from remote side.
+ */
+static HeapTuple
+TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
+					 RemapInfo * remapinfo, HeapTuple tuple)
+{
+	Datum	   *values;
+	bool	   *isnull;
+	bool		dirty = false;
+	int			i;
+
+	/*
+	 * If no remapping is necessary, just copy the tuple into a single
+	 * palloc'd chunk, as caller will expect.
+	 */
+	if (remapinfo == NULL)
+		return heap_copytuple(tuple);
+
+	/* Deform tuple so we can remap record typmods for individual attrs. */
+	values = palloc(tupledesc->natts * sizeof(Datum));
+	isnull = palloc(tupledesc->natts * sizeof(bool));
+	heap_deform_tuple(tuple, tupledesc, values, isnull);
+	Assert(tupledesc->natts == remapinfo->natts);
+
+	/* Recursively check each non-NULL attribute. */
+	for (i = 0; i < tupledesc->natts; ++i)
 	{
-		funnel->queue[funnel->nqueues++] = handle;
-		return;
+		if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
+			continue;
+		values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
+		dirty = true;
 	}
 
-	if (funnel->nqueues >= funnel->maxqueues)
+	/* Reform the modified tuple. */
+	return heap_form_tuple(tupledesc, values, isnull);
+}
+
+/*
+ * Remap a value based on the specified remap class.
+ */
+static Datum
+TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
+{
+	check_stack_depth();
+
+	switch (remapclass)
 	{
-		int			newsize = funnel->nqueues * 2;
+		case TQUEUE_REMAP_NONE:
+			/* caller probably shouldn't have called us at all, but... */
+			return value;
+
+		case TQUEUE_REMAP_ARRAY:
+			return TupleQueueRemapArray(reader, value);
 
-		Assert(funnel->nqueues == funnel->maxqueues);
+		case TQUEUE_REMAP_RANGE:
+			return TupleQueueRemapRange(reader, value);
 
-		funnel->queue = repalloc(funnel->queue,
-								 newsize * sizeof(shm_mq_handle *));
-		funnel->maxqueues = newsize;
+		case TQUEUE_REMAP_RECORD:
+			return TupleQueueRemapRecord(reader, value);
 	}
+}
 
-	funnel->queue[funnel->nqueues++] = handle;
+/*
+ * Remap an array.
+ */
+static Datum
+TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
+{
+	ArrayType  *arr = DatumGetArrayTypeP(value);
+	Oid			typeid = ARR_ELEMTYPE(arr);
+	RemapClass	remapclass;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	Datum	   *elem_values;
+	bool	   *elem_nulls;
+	int			num_elems;
+	int			i;
+
+	remapclass = GetRemapClass(typeid);
+
+	/*
+	 * If the elements of the array don't need to be walked, we shouldn't have
+	 * been called in the first place: GetRemapClass should have returned NULL
+	 * when asked about this array type.
+	 */
+	Assert(remapclass != TQUEUE_REMAP_NONE);
+
+	/* Deconstruct the array. */
+	get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
+	deconstruct_array(arr, typeid, typlen, typbyval, typalign,
+					  &elem_values, &elem_nulls, &num_elems);
+
+	/* Remap each element. */
+	for (i = 0; i < num_elems; ++i)
+		if (!elem_nulls[i])
+			elem_values[i] = TupleQueueRemap(reader, remapclass,
+											 elem_values[i]);
+
+	/* Reconstruct and return the array.  */
+	arr = construct_md_array(elem_values, elem_nulls,
+							 ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
+							 typeid, typlen, typbyval, typalign);
+	return PointerGetDatum(arr);
 }
 
 /*
- * Fetch a tuple from a tuple queue funnel.
- *
- * We try to read from the queues in round-robin fashion so as to avoid
- * the situation where some workers get their tuples read expediently while
- * others are barely ever serviced.
- *
- * Even when nowait = false, we read from the individual queues in
- * non-blocking mode.  Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
- * it can still accumulate bytes from a partially-read message, so doing it
- * this way should outperform doing a blocking read on each queue in turn.
- *
- * The return value is NULL if there are no remaining queues or if
- * nowait = true and no queue returned a tuple without blocking.  *done, if
- * not NULL, is set to true when there are no remaining queues and false in
- * any other case.
+ * Remap a range type.
  */
-HeapTuple
-TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
+static Datum
+TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
 {
-	int			waitpos = funnel->nextqueue;
+	RangeType  *range = DatumGetRangeType(value);
+	Oid			typeid = RangeTypeGetOid(range);
+	RemapClass	remapclass;
+	TypeCacheEntry *typcache;
+	RangeBound	lower;
+	RangeBound	upper;
+	bool		empty;
+
+	/*
+	 * Extract the lower and upper bounds.  As in tqueueWalkRange, some
+	 * caching might be a good idea here.
+	 */
+	typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
+	if (typcache->rngelemtype == NULL)
+		elog(ERROR, "type %u is not a range type", typeid);
+	range_deserialize(typcache, range, &lower, &upper, &empty);
+
+	/* Nothing to do for an empty range. */
+	if (empty)
+		return value;
+
+	/*
+	 * If the range bounds don't need to be walked, we shouldn't have been
+	 * called in the first place: GetRemapClass should have returned NULL when
+	 * asked about this range type.
+	 */
+	remapclass = GetRemapClass(typeid);
+	Assert(remapclass != TQUEUE_REMAP_NONE);
+
+	/* Remap each bound, if present. */
+	if (!upper.infinite)
+		upper.val = TupleQueueRemap(reader, remapclass, upper.val);
+	if (!lower.infinite)
+		lower.val = TupleQueueRemap(reader, remapclass, lower.val);
+
+	/* And reserialize. */
+	range = range_serialize(typcache, &lower, &upper, empty);
+	return RangeTypeGetDatum(range);
+}
 
-	/* Corner case: called before adding any queues, or after all are gone. */
-	if (funnel->nqueues == 0)
+/*
+ * Remap a record.
+ */
+static Datum
+TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
+{
+	HeapTupleHeader tup;
+	Oid			typeid;
+	int			typmod;
+	RecordTypemodMap *mapent;
+	TupleDesc	tupledesc;
+	RemapInfo  *remapinfo;
+	HeapTupleData htup;
+	HeapTuple	atup;
+
+	/* Fetch type OID and typemod. */
+	tup = DatumGetHeapTupleHeader(value);
+	typeid = HeapTupleHeaderGetTypeId(tup);
+	typmod = HeapTupleHeaderGetTypMod(tup);
+
+	/* If transient record, replace remote typmod with local typmod. */
+	if (typeid == RECORDOID)
 	{
-		if (done != NULL)
-			*done = true;
-		return NULL;
+		Assert(reader->typmodmap != NULL);
+		mapent = hash_search(reader->typmodmap, &typmod,
+							 HASH_FIND, NULL);
+		if (mapent == NULL)
+			elog(ERROR, "found unrecognized remote typmod %d", typmod);
+		typmod = mapent->localtypmod;
 	}
 
-	if (done != NULL)
-		*done = false;
+	/*
+	 * Fetch tupledesc and compute remap info.  We should probably cache this
+	 * so that we don't have to keep recomputing it.
+	 */
+	tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
+	remapinfo = BuildRemapInfo(tupledesc);
+	DecrTupleDescRefCount(tupledesc);
+
+	/* Remap tuple. */
+	ItemPointerSetInvalid(&htup.t_self);
+	htup.t_tableOid = InvalidOid;
+	htup.t_len = HeapTupleHeaderGetDatumLength(tup);
+	htup.t_data = tup;
+	atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
+	HeapTupleHeaderSetTypeId(atup->t_data, typeid);
+	HeapTupleHeaderSetTypMod(atup->t_data, typmod);
+	HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
+
+	/* And return the results. */
+	return HeapTupleHeaderGetDatum(atup->t_data);
+}
 
-	for (;;)
+/*
+ * Handle a control message from the tuple queue reader.
+ *
+ * Control messages are sent when the remote side is sending tuples that
+ * contain transient record types.  We need to arrange to bless those
+ * record types locally and translate between remote and local typmods.
+ */
+static void
+TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
+							   char *data)
+{
+	int			natts;
+	int			remotetypmod;
+	bool		hasoid;
+	char	   *buf = data;
+	int			rc = 0;
+	int			i;
+	Form_pg_attribute *attrs;
+	MemoryContext oldcontext;
+	TupleDesc	tupledesc;
+	RecordTypemodMap *mapent;
+	bool		found;
+
+	/* Extract remote typmod. */
+	memcpy(&remotetypmod, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract attribute count. */
+	memcpy(&natts, &buf[rc], sizeof(int));
+	rc += sizeof(int);
+
+	/* Extract hasoid flag. */
+	memcpy(&hasoid, &buf[rc], sizeof(bool));
+	rc += sizeof(bool);
+
+	/* Extract attribute details. */
+	oldcontext = MemoryContextSwitchTo(CurTransactionContext);
+	attrs = palloc(natts * sizeof(Form_pg_attribute));
+	for (i = 0; i < natts; ++i)
 	{
-		shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
-		shm_mq_result result;
-		Size		nbytes;
-		void	   *data;
+		attrs[i] = palloc(sizeof(FormData_pg_attribute));
+		memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
+		rc += sizeof(FormData_pg_attribute);
+	}
+	MemoryContextSwitchTo(oldcontext);
 
-		/* Attempt to read a message. */
-		result = shm_mq_receive(mqh, &nbytes, &data, true);
+	/* We should have read the whole message. */
+	Assert(rc == nbytes);
 
-		/*
-		 * Normally, we advance funnel->nextqueue to the next queue at this
-		 * point, but if we're pointing to a queue that we've just discovered
-		 * is detached, then forget that queue and leave the pointer where it
-		 * is until the number of remaining queues fall below that pointer and
-		 * at that point make the pointer point to the first queue.
-		 */
-		if (result != SHM_MQ_DETACHED)
-			funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
-		else
+	/* Construct TupleDesc. */
+	tupledesc = CreateTupleDesc(natts, hasoid, attrs);
+	tupledesc = BlessTupleDesc(tupledesc);
+
+	/* Create map if it doesn't exist already. */
+	if (reader->typmodmap == NULL)
+	{
+		HASHCTL		ctl;
+
+		ctl.keysize = sizeof(int);
+		ctl.entrysize = sizeof(RecordTypemodMap);
+		ctl.hcxt = CurTransactionContext;
+		reader->typmodmap = hash_create("typmodmap hashtable",
+										100, &ctl, HASH_ELEM | HASH_CONTEXT);
+	}
+
+	/* Create map entry. */
+	mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
+						 &found);
+	if (found)
+		elog(ERROR, "duplicate message for typmod %d",
+			 remotetypmod);
+	mapent->localtypmod = tupledesc->tdtypmod;
+	elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
+		 remotetypmod, tupledesc->tdtypmod);
+}
+
+/*
+ * Build a mapping indicating what remapping class applies to each attribute
+ * described by a tupledesc.
+ */
+static RemapInfo *
+BuildRemapInfo(TupleDesc tupledesc)
+{
+	RemapInfo  *remapinfo;
+	Size		size;
+	AttrNumber	i;
+	bool		noop = true;
+	StringInfoData buf;
+
+	initStringInfo(&buf);
+
+	size = offsetof(RemapInfo, mapping) +
+		sizeof(RemapClass) * tupledesc->natts;
+	remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
+	remapinfo->natts = tupledesc->natts;
+	for (i = 0; i < tupledesc->natts; ++i)
+	{
+		Form_pg_attribute attr = tupledesc->attrs[i];
+
+		if (attr->attisdropped)
 		{
-			--funnel->nqueues;
-			if (funnel->nqueues == 0)
-			{
-				if (done != NULL)
-					*done = true;
-				return NULL;
-			}
+			remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
+			continue;
+		}
 
-			memmove(&funnel->queue[funnel->nextqueue],
-					&funnel->queue[funnel->nextqueue + 1],
-					sizeof(shm_mq_handle *)
-					* (funnel->nqueues - funnel->nextqueue));
+		remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
+		if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
+			noop = false;
+	}
+
+	if (noop)
+	{
+		appendStringInfo(&buf, "noop");
+		pfree(remapinfo);
+		remapinfo = NULL;
+	}
+
+	return remapinfo;
+}
+
+/*
+ * Determine the remap class assocociated with a particular data type.
+ *
+ * Transient record types need to have the typmod applied on the sending side
+ * replaced with a value on the receiving side that has the same meaning.
+ *
+ * Arrays, range types, and all record types (including named composite types)
+ * need to searched for transient record values buried within them.
+ * Surprisingly, a walker is required even when the indicated type is a
+ * composite type, because the actual value may be a compatible transient
+ * record type.
+ */
+static RemapClass
+GetRemapClass(Oid typeid)
+{
+	RemapClass	forceResult = TQUEUE_REMAP_NONE;
+	RemapClass	innerResult = TQUEUE_REMAP_NONE;
+
+	for (;;)
+	{
+		HeapTuple	tup;
+		Form_pg_type typ;
 
-			if (funnel->nextqueue >= funnel->nqueues)
-				funnel->nextqueue = 0;
+		/* Simple cases. */
+		if (typeid == RECORDOID)
+		{
+			innerResult = TQUEUE_REMAP_RECORD;
+			break;
+		}
+		if (typeid == RECORDARRAYOID)
+		{
+			innerResult = TQUEUE_REMAP_ARRAY;
+			break;
+		}
 
-			if (funnel->nextqueue < waitpos)
-				--waitpos;
+		/* Otherwise, we need a syscache lookup to figure it out. */
+		tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
+		if (!HeapTupleIsValid(tup))
+			elog(ERROR, "cache lookup failed for type %u", typeid);
+		typ = (Form_pg_type) GETSTRUCT(tup);
 
+		/* Look through domains to underlying base type. */
+		if (typ->typtype == TYPTYPE_DOMAIN)
+		{
+			typeid = typ->typbasetype;
+			ReleaseSysCache(tup);
 			continue;
 		}
 
-		/* If we got a message, return it. */
-		if (result == SHM_MQ_SUCCESS)
+		/*
+		 * Look through arrays to underlying base type, but the final return
+		 * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE.  (If
+		 * this is an array of integers, for example, we don't need to walk
+		 * it.)
+		 */
+		if (OidIsValid(typ->typelem) && typ->typlen == -1)
 		{
-			HeapTupleData htup;
-
-			/*
-			 * The tuple data we just read from the queue is only valid until
-			 * we again attempt to read from it.  Copy the tuple into a single
-			 * palloc'd chunk as callers will expect.
-			 */
-			ItemPointerSetInvalid(&htup.t_self);
-			htup.t_tableOid = InvalidOid;
-			htup.t_len = nbytes;
-			htup.t_data = data;
-			return heap_copytuple(&htup);
+			typeid = typ->typelem;
+			ReleaseSysCache(tup);
+			if (forceResult == TQUEUE_REMAP_NONE)
+				forceResult = TQUEUE_REMAP_ARRAY;
+			continue;
 		}
 
 		/*
-		 * If we've visited all of the queues, then we should either give up
-		 * and return NULL (if we're in non-blocking mode) or wait for the
-		 * process latch to be set (otherwise).
+		 * Similarly, look through ranges to the underlying base type, but the
+		 * final return value must be either TQUEUE_REMAP_RANGE or
+		 * TQUEUE_REMAP_NONE.
 		 */
-		if (funnel->nextqueue == waitpos)
+		if (typ->typtype == TYPTYPE_RANGE)
 		{
-			if (nowait)
-				return NULL;
-			WaitLatch(MyLatch, WL_LATCH_SET, 0);
-			CHECK_FOR_INTERRUPTS();
-			ResetLatch(MyLatch);
+			ReleaseSysCache(tup);
+			if (forceResult == TQUEUE_REMAP_NONE)
+				forceResult = TQUEUE_REMAP_RANGE;
+			typeid = get_range_subtype(typeid);
+			continue;
 		}
+
+		/* Walk composite types.  Nothing else needs special handling. */
+		if (typ->typtype == TYPTYPE_COMPOSITE)
+			innerResult = TQUEUE_REMAP_RECORD;
+		ReleaseSysCache(tup);
+		break;
 	}
+
+	if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
+		return forceResult;
+	return innerResult;
 }
diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h
index 6f8eb73..6a668fa 100644
--- a/src/include/executor/tqueue.h
+++ b/src/include/executor/tqueue.h
@@ -21,11 +21,11 @@
 extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle);
 
 /* Use these to receive tuples from a shm_mq. */
-typedef struct TupleQueueFunnel TupleQueueFunnel;
-extern TupleQueueFunnel *CreateTupleQueueFunnel(void);
-extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel);
-extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *);
-extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait,
-					 bool *done);
+typedef struct TupleQueueReader TupleQueueReader;
+extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle,
+					   TupleDesc tupledesc);
+extern void DestroyTupleQueueReader(TupleQueueReader *funnel);
+extern HeapTuple TupleQueueReaderNext(TupleQueueReader *,
+					 bool nowait, bool *done);
 
 #endif   /* TQUEUE_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 939bc0e..58ec889 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1963,7 +1963,9 @@ typedef struct GatherState
 	PlanState	ps;				/* its first field is NodeTag */
 	bool		initialized;
 	struct ParallelExecutorInfo *pei;
-	struct TupleQueueFunnel *funnel;
+	int			nreaders;
+	int			nextreader;
+	struct TupleQueueReader **reader;
 	TupleTableSlot *funnel_slot;
 	bool		need_to_scan_locally;
 } GatherState;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index feb821b..03e1d2c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2018,7 +2018,7 @@ TupleHashEntry
 TupleHashEntryData
 TupleHashIterator
 TupleHashTable
-TupleQueueFunnel
+TupleQueueReader
 TupleTableSlot
 Tuplesortstate
 Tuplestorestate
-- 
2.3.8 (Apple Git-58)

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to