 contrib/postgres_fdw/expected/postgres_fdw.out | 51 +++++++------------
 contrib/postgres_fdw/postgres_fdw.c            | 56 ++++++++++++++++++++-
 src/backend/executor/nodeLimit.c               | 68 ++------------------------
 src/backend/executor/nodeMergeAppend.c         |  6 ++-
 src/backend/executor/nodeResult.c              | 25 ++++++++++
 src/backend/executor/nodeSort.c                | 10 ++++
 src/include/nodes/execnodes.h                  |  3 ++
 7 files changed, 118 insertions(+), 101 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 785f520..c3f9433 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -1003,18 +1003,15 @@ SELECT t1.c1, t2.c1 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c3, t
 -- join three tables
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c2, t3.c3 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) JOIN ft4 t3 ON (t3.c1 = t1.c1) ORDER BY t1.c3, t1.c1 OFFSET 10 LIMIT 10;
-                                                                                            QUERY PLAN                                                                                             
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+                                                                                                                     QUERY PLAN                                                                                                                     
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  Limit
    Output: t1.c1, t2.c2, t3.c3, t1.c3
-   ->  Sort
+   ->  Foreign Scan
          Output: t1.c1, t2.c2, t3.c3, t1.c3
-         Sort Key: t1.c3, t1.c1
-         ->  Foreign Scan
-               Output: t1.c1, t2.c2, t3.c3, t1.c3
-               Relations: ((public.ft1 t1) INNER JOIN (public.ft2 t2)) INNER JOIN (public.ft4 t3)
-               Remote SQL: SELECT r1."C 1", r1.c3, r2.c2, r4.c3 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")))) INNER JOIN "S 1"."T 3" r4 ON (((r1."C 1" = r4.c1))))
-(9 rows)
+         Relations: ((public.ft1 t1) INNER JOIN (public.ft2 t2)) INNER JOIN (public.ft4 t3)
+         Remote SQL: SELECT r1."C 1", r1.c3, r2.c2, r4.c3 FROM (("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (((r1."C 1" = r2."C 1")))) INNER JOIN "S 1"."T 3" r4 ON (((r1."C 1" = r4.c1)))) ORDER BY r1.c3 ASC NULLS LAST, r1."C 1" ASC NULLS LAST
+(6 rows)
 
 SELECT t1.c1, t2.c2, t3.c3 FROM ft1 t1 JOIN ft2 t2 ON (t1.c1 = t2.c1) JOIN ft4 t3 ON (t3.c1 = t1.c1) ORDER BY t1.c3, t1.c1 OFFSET 10 LIMIT 10;
  c1 | c2 |   c3   
@@ -1477,18 +1474,15 @@ SELECT t1.c1, t2.c2, t3.c3 FROM ft2 t1 LEFT JOIN ft2 t2 ON (t1.c1 = t2.c1) RIGHT
 -- full outer join + WHERE clause, only matched rows
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft4 t1 FULL JOIN ft5 t2 ON (t1.c1 = t2.c1) WHERE (t1.c1 = t2.c1 OR t1.c1 IS NULL) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
-                                                                            QUERY PLAN                                                                            
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
+                                                                                                   QUERY PLAN                                                                                                   
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
  Limit
    Output: t1.c1, t2.c1
-   ->  Sort
+   ->  Foreign Scan
          Output: t1.c1, t2.c1
-         Sort Key: t1.c1, t2.c1
-         ->  Foreign Scan
-               Output: t1.c1, t2.c1
-               Relations: (public.ft4 t1) FULL JOIN (public.ft5 t2)
-               Remote SQL: SELECT r1.c1, r2.c1 FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 = r2.c1) OR (r1.c1 IS NULL)))
-(9 rows)
+         Relations: (public.ft4 t1) FULL JOIN (public.ft5 t2)
+         Remote SQL: SELECT r1.c1, r2.c1 FROM ("S 1"."T 3" r1 FULL JOIN "S 1"."T 4" r2 ON (((r1.c1 = r2.c1)))) WHERE (((r1.c1 = r2.c1) OR (r1.c1 IS NULL))) ORDER BY r1.c1 ASC NULLS LAST, r2.c1 ASC NULLS LAST
+(6 rows)
 
 SELECT t1.c1, t2.c1 FROM ft4 t1 FULL JOIN ft5 t2 ON (t1.c1 = t2.c1) WHERE (t1.c1 = t2.c1 OR t1.c1 IS NULL) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
  c1 | c1 
@@ -1804,24 +1798,15 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
 -- CROSS JOIN, not pushed down
 EXPLAIN (VERBOSE, COSTS OFF)
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
-                             QUERY PLAN                              
----------------------------------------------------------------------
+                                                                            QUERY PLAN                                                                             
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------
  Limit
    Output: t1.c1, t2.c1
-   ->  Sort
+   ->  Foreign Scan
          Output: t1.c1, t2.c1
-         Sort Key: t1.c1, t2.c1
-         ->  Nested Loop
-               Output: t1.c1, t2.c1
-               ->  Foreign Scan on public.ft1 t1
-                     Output: t1.c1
-                     Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
-               ->  Materialize
-                     Output: t2.c1
-                     ->  Foreign Scan on public.ft2 t2
-                           Output: t2.c1
-                           Remote SQL: SELECT "C 1" FROM "S 1"."T 1"
-(15 rows)
+         Relations: (public.ft1 t1) INNER JOIN (public.ft2 t2)
+         Remote SQL: SELECT r1."C 1", r2."C 1" FROM ("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) ORDER BY r1."C 1" ASC NULLS LAST, r2."C 1" ASC NULLS LAST
+(6 rows)
 
 SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
  c1 | c1  
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index fbe6929..4631f63 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -2428,6 +2428,17 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
 	if (es->verbose)
 	{
 		sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
+
+		/* LIMIT clause is attached on execution time */
+		if (node->ss.ps.ps_numTuples > 0)
+		{
+			StringInfoData	buf;
+
+			initStringInfo(&buf);
+			appendStringInfo(&buf, "%s LIMIT %lu",
+							 sql, node->ss.ps.ps_numTuples);
+			sql = buf.data;
+		}
 		ExplainPropertyText("Remote SQL", sql, es);
 	}
 }
@@ -2493,6 +2504,7 @@ estimate_path_cost_size(PlannerInfo *root,
 						Cost *p_startup_cost, Cost *p_total_cost)
 {
 	PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
+	double		limit_tuples = -1.0;
 	double		rows;
 	double		retrieved_rows;
 	int			width;
@@ -2501,6 +2513,18 @@ estimate_path_cost_size(PlannerInfo *root,
 	Cost		cpu_per_tuple;
 
 	/*
+	 * LIMIT clause can be pushed down only when this foreign-path does not
+	 * need to join any other base relations, and the supplied path-keys
+	 * strictly match with the final output order.
+	 * Elsewhere, pushdown of LIMIT clause makes incorrect results, thus,
+	 * estimated cost don't assume remote LIMIT execution.
+	 */
+	if (root->limit_tuples >= 0.0 &&
+		equal(root->query_pathkeys, pathkeys) &&
+		bms_equal(root->all_baserels, foreignrel->relids))
+		limit_tuples = root->limit_tuples;
+
+	/*
 	 * If the table or the server is configured to use remote estimates,
 	 * connect to the foreign server and execute EXPLAIN to estimate the
 	 * number of rows selected by the restriction+join clauses.  Otherwise,
@@ -2553,6 +2577,12 @@ estimate_path_cost_size(PlannerInfo *root,
 		deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
 								remote_conds, pathkeys, &retrieved_attrs,
 								NULL);
+		/*
+		 * Attach LIMIT if this path is top-level and constant value is
+		 * specified.
+		 */
+		if (limit_tuples >= 0.0)
+			appendStringInfo(&sql, " LIMIT %.0f", limit_tuples);
 
 		/* Get the remote estimate */
 		conn = GetConnection(fpinfo->user, false);
@@ -2627,6 +2657,12 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			/* Estimate of number of rows in cross product */
 			nrows = fpinfo_i->rows * fpinfo_o->rows;
+			if (limit_tuples >= 0.0)
+			{
+				nrows = Min(nrows, limit_tuples);
+				rows = Min(rows, limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimate to at most size of cross product */
 			retrieved_rows = Min(retrieved_rows, nrows);
 
@@ -2724,6 +2760,10 @@ estimate_path_cost_size(PlannerInfo *root,
 			/*
 			 * Number of rows expected from foreign server will be same as
 			 * that of number of groups.
+			 *
+			 * MEMO: root->limit_tuples is not attached when query contains
+			 * grouping-clause or aggregate functions. So, we don't adjust
+			 * rows even if LIMIT <const> is supplied.
 			 */
 			rows = retrieved_rows = numGroups;
 
@@ -2754,8 +2794,16 @@ estimate_path_cost_size(PlannerInfo *root,
 		}
 		else
 		{
+			double		nrows = foreignrel->tuples;
+
+			if (limit_tuples >= 0.0)
+			{
+				nrows = Min(nrows, limit_tuples);
+				rows = Min(rows, limit_tuples);
+			}
+
 			/* Clamp retrieved rows estimates to at most foreignrel->tuples. */
-			retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
+			retrieved_rows = Min(retrieved_rows, nrows);
 
 			/*
 			 * Cost as though this were a seqscan, which is pessimistic.  We
@@ -2768,7 +2816,7 @@ estimate_path_cost_size(PlannerInfo *root,
 
 			startup_cost += foreignrel->baserestrictcost.startup;
 			cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
-			run_cost += cpu_per_tuple * foreignrel->tuples;
+			run_cost += cpu_per_tuple * nrows;
 		}
 
 		/*
@@ -2942,6 +2990,10 @@ create_cursor(ForeignScanState *node)
 	appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
 					 fsstate->cursor_number, fsstate->query);
 
+	/* Append LIMIT if numTuples were passed-down */
+	if (node->ss.ps.ps_numTuples > 0)
+		appendStringInfo(&buf, " LIMIT %ld", node->ss.ps.ps_numTuples);
+
 	/*
 	 * Notice that we pass NULL for paramTypes, thus forcing the remote server
 	 * to infer types for all parameters.  Since we explicitly cast every
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index faf32e1..df3fe01 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -26,7 +26,6 @@
 #include "nodes/nodeFuncs.h"
 
 static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
 
 
 /* ----------------------------------------------------------------
@@ -232,6 +231,7 @@ static void
 recompute_limits(LimitState *node)
 {
 	ExprContext *econtext = node->ps.ps_ExprContext;
+	uint64		tuples_needed;
 	Datum		val;
 	bool		isNull;
 
@@ -296,70 +296,8 @@ recompute_limits(LimitState *node)
 	node->lstate = LIMIT_RESCAN;
 
 	/* Notify child node about limit, if useful */
-	pass_down_bound(node, outerPlanState(node));
-}
-
-/*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort.  Also, if our input is a MergeAppend, we can apply the
- * same bound to any Sorts that are direct children of the MergeAppend,
- * since the MergeAppend surely need read no more than that many tuples from
- * any one input.  We also have to be prepared to look through a Result,
- * since the planner might stick one atop MergeAppend for projection purposes.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters.  If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
- */
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
-{
-	if (IsA(child_node, SortState))
-	{
-		SortState  *sortState = (SortState *) child_node;
-		int64		tuples_needed = node->count + node->offset;
-
-		/* negative test checks for overflow in sum */
-		if (node->noCount || tuples_needed < 0)
-		{
-			/* make sure flag gets reset if needed upon rescan */
-			sortState->bounded = false;
-		}
-		else
-		{
-			sortState->bounded = true;
-			sortState->bound = tuples_needed;
-		}
-	}
-	else if (IsA(child_node, MergeAppendState))
-	{
-		MergeAppendState *maState = (MergeAppendState *) child_node;
-		int			i;
-
-		for (i = 0; i < maState->ms_nplans; i++)
-			pass_down_bound(node, maState->mergeplans[i]);
-	}
-	else if (IsA(child_node, ResultState))
-	{
-		/*
-		 * An extra consideration here is that if the Result is projecting a
-		 * targetlist that contains any SRFs, we can't assume that every input
-		 * tuple generates an output tuple, so a Sort underneath might need to
-		 * return more than N tuples to satisfy LIMIT N. So we cannot use
-		 * bounded sort.
-		 *
-		 * If Result supported qual checking, we'd have to punt on seeing a
-		 * qual, too.  Note that having a resconstantqual is not a
-		 * showstopper: if that fails we're not getting any rows at all.
-		 */
-		if (outerPlanState(child_node) &&
-			!expression_returns_set((Node *) child_node->plan->targetlist))
-			pass_down_bound(node, outerPlanState(child_node));
-	}
+	tuples_needed = (node->noCount ? 0 : node->count + node->offset);
+	outerPlanState(node)->ps_numTuples = tuples_needed;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index e271927..91707c6 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -174,10 +174,14 @@ ExecMergeAppend(MergeAppendState *node)
 		/*
 		 * First time through: pull the first tuple from each subplan, and set
 		 * up the heap.
+		 * Also, pass down the required number of tuples
 		 */
 		for (i = 0; i < node->ms_nplans; i++)
 		{
-			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+			PlanState  *pstate = node->mergeplans[i];
+
+			pstate->ps_numTuples = node->ps.ps_numTuples;
+			node->ms_slots[i] = ExecProcNode(pstate);
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c
index 4007b76..9e83eb2 100644
--- a/src/backend/executor/nodeResult.c
+++ b/src/backend/executor/nodeResult.c
@@ -47,6 +47,7 @@
 
 #include "executor/executor.h"
 #include "executor/nodeResult.h"
+#include "nodes/nodeFuncs.h"
 #include "utils/memutils.h"
 
 
@@ -75,6 +76,28 @@ ExecResult(ResultState *node)
 	econtext = node->ps.ps_ExprContext;
 
 	/*
+	 * Pass down the number of required tuples by the upper node
+	 *
+	 * An extra consideration here is that if the Result is projecting a
+	 * targetlist that contains any SRFs, we can't assume that every input
+	 * tuple generates an output tuple, so a Sort underneath might need to
+	 * return more than N tuples to satisfy LIMIT N. So we cannot use
+	 * bounded sort.
+	 *
+	 * If Result supported qual checking, we'd have to punt on seeing a
+	 * qual, too.  Note that having a resconstantqual is not a
+	 * showstopper: if that fails we're not getting any rows at all.
+	 */
+	if (!node->rs_started)
+	{
+		if (outerPlanState(node) &&
+			!expression_returns_set((Node *) node->ps.plan->targetlist))
+			outerPlanState(node)->ps_numTuples = node->ps.ps_numTuples;
+
+		node->rs_started = true;
+	}
+
+	/*
 	 * check constant qualifications like (2 > 1), if not already done
 	 */
 	if (node->rs_checkqual)
@@ -218,6 +241,7 @@ ExecInitResult(Result *node, EState *estate, int eflags)
 	resstate->ps.plan = (Plan *) node;
 	resstate->ps.state = estate;
 
+	resstate->rs_started = false;
 	resstate->rs_done = false;
 	resstate->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
 
@@ -294,6 +318,7 @@ ExecEndResult(ResultState *node)
 void
 ExecReScanResult(ResultState *node)
 {
+	node->rs_started = false;
 	node->rs_done = false;
 	node->ps.ps_TupFromTlist = false;
 	node->rs_checkqual = (node->resconstantqual == NULL) ? false : true;
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index a34dcc5..6ec7abb 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -66,6 +66,16 @@ ExecSort(SortState *node)
 
 		SO1_printf("ExecSort: %s\n",
 				   "sorting subplan");
+		/*
+		 * Check bounds according to the required number of tuples
+		 */
+		if (node->ss.ps.ps_numTuples == 0)
+			node->bounded = false;
+		else
+		{
+			node->bounded = true;
+			node->bound = node->ss.ps.ps_numTuples;
+		}
 
 		/*
 		 * Want to scan subplan in the forward direction while creating the
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 8004d85..9b3aed8 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1066,6 +1066,8 @@ typedef struct PlanState
 	ProjectionInfo *ps_ProjInfo;	/* info for doing tuple projection */
 	bool		ps_TupFromTlist;/* state flag for processing set-valued
 								 * functions in targetlist */
+	uint64		ps_numTuples;	/* number of tuples required by the upper
+								 * node if any. 0 means all the tuples */
 } PlanState;
 
 /* ----------------
@@ -1114,6 +1116,7 @@ typedef struct ResultState
 {
 	PlanState	ps;				/* its first field is NodeTag */
 	ExprState  *resconstantqual;
+	bool		rs_started;		/* are we already called? */
 	bool		rs_done;		/* are we done? */
 	bool		rs_checkqual;	/* do we need to check the qual? */
 } ResultState;
