diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d4a8..813a80e42d 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,16 +47,22 @@
  * greater than any 32-bit integer here so that values < 2^32 can be used
  * by individual parallel nodes to store their own state.
  */
-#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED		UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT		UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS				UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE		UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE		UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION	UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA				UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
+typedef struct FixedParallelExecutorState
+{
+	int64		tuples_needed;
+} FixedParallelExecutorState;
+
 /*
  * DSM structure for accumulating per-PlanState instrumentation.
  *
@@ -77,6 +83,7 @@ struct SharedExecutorInstrumentation
 {
 	int			instrument_options;
 	int			instrument_offset;
+	int64		tuple_bound;
 	int			num_workers;
 	int			num_plan_nodes;
 	int			plan_node_id[FLEXIBLE_ARRAY_MEMBER];
@@ -381,12 +388,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
  * execution and return results to the main backend.
  */
 ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+					 int64 tuples_needed)
 {
 	ParallelExecutorInfo *pei;
 	ParallelContext *pcxt;
 	ExecParallelEstimateContext e;
 	ExecParallelInitializeDSMContext d;
+	FixedParallelExecutorState *fpes;
 	char	   *pstmt_data;
 	char	   *pstmt_space;
 	char	   *param_space;
@@ -418,6 +427,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	 * for the various things we need to store.
 	 */
 
+	/* Estimate space for fixed-size state. */
+	shm_toc_estimate_chunk(&pcxt->estimator,
+						   sizeof(FixedParallelExecutorState));
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/* Estimate space for query text. */
 	query_len = strlen(estate->es_sourceText);
 	shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +501,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
 	 * asked for has been allocated or initialized yet, though, so do that.
 	 */
 
+	/* Store fixed-size state. */
+	fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+	fpes->tuples_needed = tuples_needed;
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
 	/* Store query string */
 	query_string = shm_toc_allocate(pcxt->toc, query_len);
 	memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +852,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
 void
 ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 {
+	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
@@ -841,6 +861,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	void	   *area_space;
 	dsa_area   *area;
 
+	/* Get fixed-size state. */
+	fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
 	/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
 	receiver = ExecParallelGetReceiver(seg, toc);
 	instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,6 +891,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	queryDesc->planstate->state->es_query_dsa = area;
 	ExecParallelInitializeWorker(queryDesc->planstate, toc);
 
+	/* Pass down any tuple bound */
+	ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
 	/* Run the plan */
 	ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
 
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36d2914249..d1abd20cd1 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -757,3 +757,116 @@ ExecShutdownNode(PlanState *node)
 
 	return false;
 }
+
+/*
+ * Set a tuple bound for a planstate node.  This lets us optimize based on
+ * the knowledge that the maximum number of tuples which must be returned is
+ * limited.
+ *
+ * This is a bit of a kluge, but we don't have any more-abstract way of
+ * communicating between the two nodes; and it doesn't seem worth trying
+ * to invent one without some more examples of special communication needs.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+	/*
+	 * If the child is a subquery that does no filtering (no predicates)
+	 * and does not have any SRFs in the target list then we can potentially
+	 * push the tuple bound through the subquery. It is possible that we could
+	 * have multiple subqueries, so tunnel through them all.
+	 */
+	while (IsA(child_node, SubqueryScanState))
+	{
+		SubqueryScanState *subqueryScanState;
+
+		subqueryScanState = (SubqueryScanState *) child_node;
+
+		/*
+		 * Non-empty predicates or an SRF means we cannot push down the bound.
+		 */
+		if (subqueryScanState->ss.ps.qual != NULL ||
+			expression_returns_set((Node *) child_node->plan->targetlist))
+			return;
+
+		/* Use the child in the following checks */
+		child_node = subqueryScanState->subplan;
+	}
+
+	if (IsA(child_node, SortState))
+	{
+		SortState  *sortState = castNode(SortState, child_node);
+
+		/*
+		 * If our input is a Sort node, notify it that it can use bounded sort.
+		 *
+		 * Note: it is the responsibility of nodeSort.c to react properly to
+		 * changes of these parameters.  If we ever do redesign this, it'd be a
+		 * good idea to integrate this signaling with the parameter-change
+		 * mechanism.
+		 */
+
+		/* negative test checks for overflow in sum */
+		if (tuples_needed < 0)
+		{
+			/* make sure flag gets reset if needed upon rescan */
+			sortState->bounded = false;
+		}
+		else
+		{
+			sortState->bounded = true;
+			sortState->bound = tuples_needed;
+		}
+	}
+	else if (IsA(child_node, MergeAppendState))
+	{
+		MergeAppendState *maState = castNode(MergeAppendState, child_node);
+		int			i;
+
+		/*
+		 * Also, if our input is a MergeAppend, we can apply the same bound to
+		 * any Sorts that are direct children of the MergeAppend, since the
+		 * MergeAppend surely need read no more than that many tuples from
+		 * any one input.
+		 */
+		for (i = 0; i < maState->ms_nplans; i++)
+			ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+	}
+	else if (IsA(child_node, ResultState))
+	{
+		/*
+		 * We also have to be prepared to look through a Result, since the
+		 * planner might stick one atop MergeAppend for projection purposes.
+		 *
+		 * If Result supported qual checking, we'd have to punt on seeing a
+		 * qual.  Note that having a resconstantqual is not a showstopper: if
+		 * that fails we're not getting any rows at all.
+		 */
+		if (outerPlanState(child_node))
+			ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+	else if (IsA(child_node, Gather))
+	{
+		GatherState *gstate = castNode(GatherState, child_node);
+
+		/*
+		 * We might have a Gather node, which can propagate the bound to
+		 * workers.
+		 *
+		 * Note: As with Sort, the Gather node is responsible for reacting
+		 * properly to changes to this parameter.
+		 */
+		gstate->tuples_needed = tuples_needed;
+
+		/* we should also pass this down to our own copy of the child plan */
+		ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+	else if (IsA(child_node, GatherMerge))
+	{
+		GatherMergeState *gstate = castNode(GatherMergeState, child_node);
+
+		/* Same idea here as for Gather */
+		gstate->tuples_needed = tuples_needed;
+		ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+	}
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index e8d94ee6f3..a0f5a60d93 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
 	gatherstate->ps.state = estate;
 	gatherstate->ps.ExecProcNode = ExecGather;
 	gatherstate->need_to_scan_locally = !node->single_copy;
+	gatherstate->tuples_needed = -1;
 
 	/*
 	 * Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
 												 estate,
-												 gather->num_workers);
+												 gather->num_workers,
+												 node->tuples_needed);
 
 			/*
 			 * Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 64c62398bb..2526c584fd 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
 	gm_state->ps.plan = (Plan *) node;
 	gm_state->ps.state = estate;
 	gm_state->ps.ExecProcNode = ExecGatherMerge;
+	gm_state->tuples_needed = -1;
 
 	/*
 	 * Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
 			if (!node->pei)
 				node->pei = ExecInitParallelPlan(node->ps.lefttree,
 												 estate,
-												 gm->num_workers);
+												 gm->num_workers,
+												 node->tuples_needed);
 
 			/* Try to launch workers. */
 			pcxt = node->pei->pcxt;
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index 09af1a5d8b..1ee035b5e5 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -27,7 +27,7 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);
 
 
 /* ----------------------------------------------------------------
@@ -298,89 +298,18 @@ recompute_limits(LimitState *node)
 	node->lstate = LIMIT_RESCAN;
 
 	/* Notify child node about limit, if useful */
-	pass_down_bound(node, outerPlanState(node));
+	ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
 }
 
 /*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  Also, if our input is a MergeAppend, we can apply the
- * same bound to any Sorts that are direct children of the MergeAppend,
- * since the MergeAppend surely need read no more than that many tuples from
- * any one input.  We also have to be prepared to look through a Result,
- * since the planner might stick one atop MergeAppend for projection purposes.
- * We can also accept one or more levels of subqueries that have no quals or
- * SRFs (that is, each subquery is just projecting columns) between the LIMIT
- * and any of the above.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the number of tuples needed to satisfy a Limit node.
  */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
 {
-	/*
-	 * If the child is a subquery that does no filtering (no predicates)
-	 * and does not have any SRFs in the target list then we can potentially
-	 * push the limit through the subquery. It is possible that we could have
-	 * multiple subqueries, so tunnel through them all.
-	 */
-	while (IsA(child_node, SubqueryScanState))
-	{
-		SubqueryScanState *subqueryScanState;
-
-		subqueryScanState = (SubqueryScanState *) child_node;
-
-		/*
-		 * Non-empty predicates or an SRF means we cannot push down the limit.
-		 */
-		if (subqueryScanState->ss.ps.qual != NULL ||
-			expression_returns_set((Node *) child_node->plan->targetlist))
-			return;
-
-		/* Use the child in the following checks */
-		child_node = subqueryScanState->subplan;
-	}
-
-	if (IsA(child_node, SortState))
-	{
-		SortState  *sortState = (SortState *) child_node;
-		int64		tuples_needed = node->count + node->offset;
-
-		/* negative test checks for overflow in sum */
-		if (node->noCount || tuples_needed < 0)
-		{
-			/* make sure flag gets reset if needed upon rescan */
-			sortState->bounded = false;
-		}
-		else
-		{
-			sortState->bounded = true;
-			sortState->bound = tuples_needed;
-		}
-	}
-	else if (IsA(child_node, MergeAppendState))
-	{
-		MergeAppendState *maState = (MergeAppendState *) child_node;
-		int			i;
-
-		for (i = 0; i < maState->ms_nplans; i++)
-			pass_down_bound(node, maState->mergeplans[i]);
-	}
-	else if (IsA(child_node, ResultState))
-	{
-		/*
-		 * If Result supported qual checking, we'd have to punt on seeing a
-		 * qual.  Note that having a resconstantqual is not a showstopper: if
-		 * that fails we're not getting any rows at all.
-		 */
-		if (outerPlanState(child_node))
-			pass_down_bound(node, outerPlanState(child_node));
-	}
+	if (node->noCount)
+		return -1;
+	return node->count + node->offset;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index bd0a87fa04..79b886706f 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
 } ParallelExecutorInfo;
 
 extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
-					 EState *estate, int nworkers);
+					 EState *estate, int nworkers, int64 tuples_needed);
 extern void ExecParallelFinish(ParallelExecutorInfo *pei);
 extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
 extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eacbea3c36..f48a603dae 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
 extern Node *MultiExecProcNode(PlanState *node);
 extern void ExecEndNode(PlanState *node);
 extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3272c4b315..17ae657022 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1919,6 +1919,7 @@ typedef struct GatherState
 	struct TupleQueueReader **reader;
 	TupleTableSlot *funnel_slot;
 	bool		need_to_scan_locally;
+	int64		tuples_needed;
 } GatherState;
 
 /* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
 	struct binaryheap *gm_heap; /* binary heap of slot indices */
 	bool		gm_initialized; /* gather merge initilized ? */
 	bool		need_to_scan_locally;
+	int64		tuples_needed;
 	int			gm_nkeys;
 	SortSupport gm_sortkeys;	/* array of length ms_nkeys */
 	struct GMReaderTupleBuffer *gm_tuple_buffers;	/* tuple buffer per reader */
