diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index a189a8efc3..1145a9bdda 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3717,6 +3717,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-incrementalsort" xreflabel="enable_incrementalsort">
+      <term><varname>enable_incrementalsort</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_incrementalsort</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of incremental sort
+        steps. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-indexscan" xreflabel="enable_indexscan">
       <term><varname>enable_indexscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 79f639d5e2..da9b030670 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -81,6 +81,8 @@ static void show_upper_qual(List *qual, const char *qlabel,
 				ExplainState *es);
 static void show_sort_keys(SortState *sortstate, List *ancestors,
 			   ExplainState *es);
+static void show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+					   List *ancestors, ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 					   ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
@@ -94,7 +96,7 @@ static void show_grouping_set_keys(PlanState *planstate,
 static void show_group_keys(GroupState *gstate, List *ancestors,
 				ExplainState *es);
 static void show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es);
 static void show_sortorder_options(StringInfo buf, Node *sortexpr,
@@ -102,6 +104,8 @@ static void show_sortorder_options(StringInfo buf, Node *sortexpr,
 static void show_tablesample(TableSampleClause *tsc, PlanState *planstate,
 				 List *ancestors, ExplainState *es);
 static void show_sort_info(SortState *sortstate, ExplainState *es);
+static void show_incremental_sort_info(IncrementalSortState *incrsortstate,
+									   ExplainState *es);
 static void show_hash_info(HashState *hashstate, ExplainState *es);
 static void show_tidbitmap_info(BitmapHeapScanState *planstate,
 					ExplainState *es);
@@ -1067,6 +1071,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
 		case T_Sort:
 			pname = sname = "Sort";
 			break;
+		case T_IncrementalSort:
+			pname = sname = "Incremental Sort";
+			break;
 		case T_Group:
 			pname = sname = "Group";
 			break;
@@ -1677,6 +1684,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
 			show_sort_keys(castNode(SortState, planstate), ancestors, es);
 			show_sort_info(castNode(SortState, planstate), es);
 			break;
+		case T_IncrementalSort:
+			show_incremental_sort_keys(castNode(IncrementalSortState, planstate),
+									   ancestors, es);
+			show_incremental_sort_info(castNode(IncrementalSortState, planstate),
+									   es);
+			break;
 		case T_MergeAppend:
 			show_merge_append_keys(castNode(MergeAppendState, planstate),
 								   ancestors, es);
@@ -2006,12 +2019,29 @@ show_sort_keys(SortState *sortstate, List *ancestors, ExplainState *es)
 	Sort	   *plan = (Sort *) sortstate->ss.ps.plan;
 
 	show_sort_group_keys((PlanState *) sortstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, 0, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
 }
 
+/*
+ * Show the sort keys for a IncrementalSort node.
+ */
+static void
+show_incremental_sort_keys(IncrementalSortState *incrsortstate,
+						   List *ancestors, ExplainState *es)
+{
+	IncrementalSort *plan = (IncrementalSort *) incrsortstate->ss.ps.plan;
+
+	show_sort_group_keys((PlanState *) incrsortstate, "Sort Key",
+						 plan->sort.numCols, plan->presortedCols,
+						 plan->sort.sortColIdx,
+						 plan->sort.sortOperators, plan->sort.collations,
+						 plan->sort.nullsFirst,
+						 ancestors, es);
+}
+
 /*
  * Likewise, for a MergeAppend node.
  */
@@ -2022,7 +2052,7 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
 	MergeAppend *plan = (MergeAppend *) mstate->ps.plan;
 
 	show_sort_group_keys((PlanState *) mstate, "Sort Key",
-						 plan->numCols, plan->sortColIdx,
+						 plan->numCols, 0, plan->sortColIdx,
 						 plan->sortOperators, plan->collations,
 						 plan->nullsFirst,
 						 ancestors, es);
@@ -2046,7 +2076,7 @@ show_agg_keys(AggState *astate, List *ancestors,
 			show_grouping_sets(outerPlanState(astate), plan, ancestors, es);
 		else
 			show_sort_group_keys(outerPlanState(astate), "Group Key",
-								 plan->numCols, plan->grpColIdx,
+								 plan->numCols, 0, plan->grpColIdx,
 								 NULL, NULL, NULL,
 								 ancestors, es);
 
@@ -2115,7 +2145,7 @@ show_grouping_set_keys(PlanState *planstate,
 	if (sortnode)
 	{
 		show_sort_group_keys(planstate, "Sort Key",
-							 sortnode->numCols, sortnode->sortColIdx,
+							 sortnode->numCols, 0, sortnode->sortColIdx,
 							 sortnode->sortOperators, sortnode->collations,
 							 sortnode->nullsFirst,
 							 ancestors, es);
@@ -2172,7 +2202,7 @@ show_group_keys(GroupState *gstate, List *ancestors,
 	/* The key columns refer to the tlist of the child plan */
 	ancestors = lcons(gstate, ancestors);
 	show_sort_group_keys(outerPlanState(gstate), "Group Key",
-						 plan->numCols, plan->grpColIdx,
+						 plan->numCols, 0, plan->grpColIdx,
 						 NULL, NULL, NULL,
 						 ancestors, es);
 	ancestors = list_delete_first(ancestors);
@@ -2185,13 +2215,14 @@ show_group_keys(GroupState *gstate, List *ancestors,
  */
 static void
 show_sort_group_keys(PlanState *planstate, const char *qlabel,
-					 int nkeys, AttrNumber *keycols,
+					 int nkeys, int nPresortedKeys, AttrNumber *keycols,
 					 Oid *sortOperators, Oid *collations, bool *nullsFirst,
 					 List *ancestors, ExplainState *es)
 {
 	Plan	   *plan = planstate->plan;
 	List	   *context;
 	List	   *result = NIL;
+	List	   *resultPresorted = NIL;
 	StringInfoData sortkeybuf;
 	bool		useprefix;
 	int			keyno;
@@ -2231,9 +2262,13 @@ show_sort_group_keys(PlanState *planstate, const char *qlabel,
 								   nullsFirst[keyno]);
 		/* Emit one property-list item per sort key */
 		result = lappend(result, pstrdup(sortkeybuf.data));
+		if (keyno < nPresortedKeys)
+			resultPresorted = lappend(resultPresorted, exprstr);
 	}
 
 	ExplainPropertyList(qlabel, result, es);
+	if (nPresortedKeys > 0)
+		ExplainPropertyList("Presorted Key", resultPresorted, es);
 }
 
 /*
@@ -2441,6 +2476,95 @@ show_sort_info(SortState *sortstate, ExplainState *es)
 	}
 }
 
+/*
+ * If it's EXPLAIN ANALYZE, show tuplesort stats for a incremental sort node
+ */
+static void
+show_incremental_sort_info(IncrementalSortState *incrsortstate,
+						   ExplainState *es)
+{
+	if (es->analyze && incrsortstate->sort_Done &&
+		incrsortstate->tuplesortstate != NULL)
+	{
+		Tuplesortstate *state = (Tuplesortstate *) incrsortstate->tuplesortstate;
+		TuplesortInstrumentation stats;
+		const char *sortMethod;
+		const char *spaceType;
+		long		spaceUsed;
+
+		tuplesort_get_stats(state, &stats);
+		sortMethod = tuplesort_method_name(stats.sortMethod);
+		spaceType = tuplesort_space_type_name(stats.spaceType);
+		spaceUsed = stats.spaceUsed;
+
+		if (es->format == EXPLAIN_FORMAT_TEXT)
+		{
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Method: %s  %s: %ldkB\n",
+							 sortMethod, spaceType, spaceUsed);
+			appendStringInfoSpaces(es->str, es->indent * 2);
+			appendStringInfo(es->str, "Sort Groups: %ld\n",
+							 incrsortstate->group_count);
+		}
+		else
+		{
+			ExplainPropertyText("Sort Method", sortMethod, es);
+			ExplainPropertyInteger("Sort Space Used", "kB", spaceUsed, es);
+			ExplainPropertyText("Sort Space Type", spaceType, es);
+			ExplainPropertyInteger("Sort Groups:", NULL,
+								   incrsortstate->group_count, es);
+		}
+	}
+
+	if (incrsortstate->shared_info != NULL)
+	{
+		int			n;
+		bool		opened_group = false;
+
+		for (n = 0; n < incrsortstate->shared_info->num_workers; n++)
+		{
+			TuplesortInstrumentation *sinstrument;
+			const char *sortMethod;
+			const char *spaceType;
+			long		spaceUsed;
+			int64		group_count;
+
+			sinstrument = &incrsortstate->shared_info->sinfo[n].sinstrument;
+			group_count = incrsortstate->shared_info->sinfo[n].group_count;
+			if (sinstrument->sortMethod == SORT_TYPE_STILL_IN_PROGRESS)
+				continue;		/* ignore any unfilled slots */
+			sortMethod = tuplesort_method_name(sinstrument->sortMethod);
+			spaceType = tuplesort_space_type_name(sinstrument->spaceType);
+			spaceUsed = sinstrument->spaceUsed;
+
+			if (es->format == EXPLAIN_FORMAT_TEXT)
+			{
+				appendStringInfoSpaces(es->str, es->indent * 2);
+				appendStringInfo(es->str,
+								 "Worker %d:  Sort Method: %s  %s: %ldkB  Groups: %ld\n",
+								 n, sortMethod, spaceType, spaceUsed, group_count);
+			}
+			else
+			{
+				if (!opened_group)
+				{
+					ExplainOpenGroup("Workers", "Workers", false, es);
+					opened_group = true;
+				}
+				ExplainOpenGroup("Worker", NULL, true, es);
+				ExplainPropertyInteger("Worker Number", NULL, n, es);
+				ExplainPropertyText("Sort Method", sortMethod, es);
+				ExplainPropertyInteger("Sort Space Used", "kB", spaceUsed, es);
+				ExplainPropertyText("Sort Space Type", spaceType, es);
+				ExplainPropertyInteger("Sort Groups", NULL, group_count, es);
+				ExplainCloseGroup("Worker", NULL, true, es);
+			}
+		}
+		if (opened_group)
+			ExplainCloseGroup("Workers", "Workers", false, es);
+	}
+}
+
 /*
  * Show information on hash buckets/batches.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 76d87eea49..c2f06da4e5 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -24,8 +24,8 @@ OBJS = execAmi.o execCurrent.o execExpr.o execExprInterp.o \
        nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
        nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
        nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
-       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
-       nodeValuesscan.o \
+       nodeSamplescan.o nodeSeqscan.o nodeSetOp.o \
+       nodeSort.o nodeIncrementalSort.o nodeUnique.o nodeValuesscan.o \
        nodeCtescan.o nodeNamedtuplestorescan.o nodeWorktablescan.o \
        nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \
        nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 9e78421978..520aeefd83 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -31,6 +31,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -253,6 +254,10 @@ ExecReScan(PlanState *node)
 			ExecReScanSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecReScanIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecReScanGroup((GroupState *) node);
 			break;
@@ -525,8 +530,16 @@ ExecSupportsBackwardScan(Plan *node)
 		case T_CteScan:
 		case T_Material:
 		case T_Sort:
+			/* these don't evaluate tlist */
 			return true;
 
+		case T_IncrementalSort:
+			/*
+			 * Unlike full sort, incremental sort keeps only a single group
+			 * of tuples in memory, so it can't scan backwards.
+			 */
+			return false;
+
 		case T_LockRows:
 		case T_Limit:
 			return ExecSupportsBackwardScan(outerPlan(node));
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 52f1a96db5..fc3910502b 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -32,6 +32,7 @@
 #include "executor/nodeForeignscan.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeSeqscan.h"
@@ -281,6 +282,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortEstimate((SortState *) planstate, e->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
+			break;
 
 		default:
 			break;
@@ -494,6 +499,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
+			break;
 
 		default:
 			break;
@@ -918,6 +927,7 @@ ExecParallelReInitializeDSM(PlanState *planstate,
 			break;
 		case T_HashState:
 		case T_SortState:
+		case T_IncrementalSortState:
 			/* these nodes have DSM state, but no reinitialization is required */
 			break;
 
@@ -978,6 +988,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
 		case T_SortState:
 			ExecSortRetrieveInstrumentation((SortState *) planstate);
 			break;
+		case T_IncrementalSortState:
+			ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
+			break;
 		case T_HashState:
 			ExecHashRetrieveInstrumentation((HashState *) planstate);
 			break;
@@ -1227,6 +1240,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
 			/* even when not parallel-aware, for EXPLAIN ANALYZE */
 			ExecSortInitializeWorker((SortState *) planstate, pwcxt);
 			break;
+		case T_IncrementalSortState:
+			/* even when not parallel-aware, for EXPLAIN ANALYZE */
+			ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
+												pwcxt);
+			break;
 
 		default:
 			break;
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index a3fb4495d2..943ca65372 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -88,6 +88,7 @@
 #include "executor/nodeGroup.h"
 #include "executor/nodeHash.h"
 #include "executor/nodeHashjoin.h"
+#include "executor/nodeIncrementalSort.h"
 #include "executor/nodeIndexonlyscan.h"
 #include "executor/nodeIndexscan.h"
 #include "executor/nodeLimit.h"
@@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
 												estate, eflags);
 			break;
 
+		case T_IncrementalSort:
+			result = (PlanState *) ExecInitIncrementalSort((IncrementalSort *) node,
+														   estate, eflags);
+			break;
+
 		case T_Group:
 			result = (PlanState *) ExecInitGroup((Group *) node,
 												 estate, eflags);
@@ -695,6 +701,10 @@ ExecEndNode(PlanState *node)
 			ExecEndSort((SortState *) node);
 			break;
 
+		case T_IncrementalSortState:
+			ExecEndIncrementalSort((IncrementalSortState *) node);
+			break;
+
 		case T_GroupState:
 			ExecEndGroup((GroupState *) node);
 			break;
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c
new file mode 100644
index 0000000000..0fbb63d4b2
--- /dev/null
+++ b/src/backend/executor/nodeIncrementalSort.c
@@ -0,0 +1,681 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncremenalSort.c
+ *	  Routines to handle incremental sorting of relations.
+ *
+ * DESCRIPTION
+ *
+ *		Incremental sort is an optimized variant of multikey sort for cases
+ *		when the input is already sorted by a prefix of the sort keys.  For
+ *		example when a sort by (key1, key2 ... keyN) is requested, and the
+ *		input is already sorted by (key1, key2 ... keyM), M < N, we can
+ *		divide the input into groups where keys (key1, ... keyM) are equal,
+ *		and only sort on the remaining columns.
+ *
+ *		Consider the following example.  We have input tuples consisting of
+ *		two integers (X, Y) already presorted by X, while it's required to
+ *		sort them by both X and Y.  Let input tuples be following.
+ *
+ *		(1, 5)
+ *		(1, 2)
+ *		(2, 9)
+ *		(2, 1)
+ *		(2, 5)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort algorithm would split the input into the following
+ *		groups, which have equal X, and then sort them by Y individually:
+ *
+ *			(1, 5) (1, 2)
+ *			(2, 9) (2, 1) (2, 5)
+ *			(3, 3) (3, 7)
+ *
+ *		After sorting these groups and putting them altogether, we would get
+ *		the following result which is sorted by X and Y, as requested:
+ *
+ *		(1, 2)
+ *		(1, 5)
+ *		(2, 1)
+ *		(2, 5)
+ *		(2, 9)
+ *		(3, 3)
+ *		(3, 7)
+ *
+ *		Incremental sort may be more efficient than plain sort, parcitularly
+ *		on large datasets, as it reduces the amount of data to sort at once,
+ *		making it more likely it fits into work_mem (eliminating the need to
+ *		spill to disk).  But the main advantage of incremental sort is that
+ *		it can start producing rows early, before sorting the whole dataset,
+ *		which is a significant benefit especially for queries with LIMIT.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/nodeIncremenalSort.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "executor/execdebug.h"
+#include "executor/nodeIncrementalSort.h"
+#include "miscadmin.h"
+#include "utils/lsyscache.h"
+#include "utils/tuplesort.h"
+
+/*
+ * Prepare information for presorted_keys comparison.
+ */
+static void
+preparePresortedCols(IncrementalSortState *node)
+{
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	int					presortedCols,
+						i;
+
+	Assert(IsA(plannode, IncrementalSort));
+	presortedCols = plannode->presortedCols;
+
+	node->presorted_keys = (PresortedKeyData *) palloc(presortedCols *
+													sizeof(PresortedKeyData));
+
+	for (i = 0; i < presortedCols; i++)
+	{
+		Oid					equalityOp,
+							equalityFunc;
+		PresortedKeyData   *key;
+
+		key = &node->presorted_keys[i];
+		key->attno = plannode->sort.sortColIdx[i];
+
+		equalityOp = get_equality_op_for_ordering_op(
+										plannode->sort.sortOperators[i], NULL);
+		if (!OidIsValid(equalityOp))
+			elog(ERROR, "missing equality operator for ordering operator %u",
+					plannode->sort.sortOperators[i]);
+
+		equalityFunc = get_opcode(equalityOp);
+		if (!OidIsValid(equalityFunc))
+			elog(ERROR, "missing function for operator %u", equalityOp);
+
+		/* Lookup the comparison function */
+		fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext);
+
+		/* We can initialize the callinfo just once and re-use it */
+		InitFunctionCallInfoData(key->fcinfo, &key->flinfo, 2,
+								plannode->sort.collations[i], NULL, NULL);
+		key->fcinfo.argnull[0] = false;
+		key->fcinfo.argnull[1] = false;
+	}
+}
+
+/*
+ * Check whether a given tuple belongs to the current sort group.
+ *
+ * We do this by comparing its first 'presortedCols' column values to
+ * the pivot tuple of the current group.
+ *
+ */
+static bool
+isCurrentGroup(IncrementalSortState *node, TupleTableSlot *tupleSlot)
+{
+	int presortedCols, i;
+	TupleTableSlot *group_pivot = node->group_pivot;
+
+	Assert(IsA(node->ss.ps.plan, IncrementalSort));
+
+	presortedCols = ((IncrementalSort *) node->ss.ps.plan)->presortedCols;
+
+	/*
+	 * We do assume the input is sorted by keys (0, ... n), which means
+	 * the tail keys are more likely to change. So we do the comparison
+	 * from the end, to minimize the number of function calls.
+	 */
+	for (i = presortedCols - 1; i >= 0; i--)
+	{
+		Datum				datumA,
+							datumB,
+							result;
+		bool				isnullA,
+							isnullB;
+		AttrNumber			attno = node->presorted_keys[i].attno;
+		PresortedKeyData   *key;
+
+		datumA = slot_getattr(group_pivot, attno, &isnullA);
+		datumB = slot_getattr(tupleSlot, attno, &isnullB);
+
+		/* Special case for NULL-vs-NULL, else use standard comparison */
+		if (isnullA || isnullB)
+		{
+			if (isnullA == isnullB)
+				continue;
+			else
+				return false;
+		}
+
+		key = &node->presorted_keys[i];
+
+		key->fcinfo.arg[0] = datumA;
+		key->fcinfo.arg[1] = datumB;
+
+		/* just for paranoia's sake, we reset isnull each time */
+		key->fcinfo.isnull = false;
+
+		result = FunctionCallInvoke(&key->fcinfo);
+
+		/* Check for null result, since caller is clearly not expecting one */
+		if (key->fcinfo.isnull)
+			elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid);
+
+		if (!DatumGetBool(result))
+			return false;
+	}
+	return true;
+}
+
+/*
+ * Sorting many small groups with tuplesort is inefficient. In order to
+ * cope with this problem we don't start a new group until the current one
+ * contains at least DEFAULT_MIN_GROUP_SIZE tuples.  However, in the case
+ * of bounded sort where bound is less than DEFAULT_MIN_GROUP_SIZE we start
+ * looking for the new group when bound is done.
+ */
+#define DEFAULT_MIN_GROUP_SIZE 32
+
+/* ----------------------------------------------------------------
+ *		ExecIncrementalSort
+ *
+ *		Assuming that outer subtree returns tuple presorted by some prefix
+ *		of target sort columns, performs incremental sort.  It fetches
+ *		groups of tuples where prefix sort columns are equal and sorts them
+ *		using tuplesort.  This approach allows to evade sorting of whole
+ *		dataset.  Besides taking less memory and being faster, it allows to
+ *		start returning tuples before fetching full dataset from outer
+ *		subtree.
+ *
+ *		Conditions:
+ *		  -- none.
+ *
+ *		Initial States:
+ *		  -- the outer child is prepared to return the first tuple.
+ * ----------------------------------------------------------------
+ */
+static TupleTableSlot *
+ExecIncrementalSort(PlanState *pstate)
+{
+	IncrementalSortState *node = castNode(IncrementalSortState, pstate);
+	EState			   *estate;
+	ScanDirection		dir;
+	Tuplesortstate	   *tuplesortstate;
+	TupleTableSlot	   *slot;
+	IncrementalSort	   *plannode = (IncrementalSort *) node->ss.ps.plan;
+	PlanState		   *outerNode;
+	TupleDesc			tupDesc;
+	int64				nTuples = 0;
+	int64				minGroupSize;
+
+	CHECK_FOR_INTERRUPTS();
+
+	/*
+	 * get state info from node
+	 */
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "entering routine");
+
+	estate = node->ss.ps.state;
+	dir = estate->es_direction;
+	tuplesortstate = (Tuplesortstate *) node->tuplesortstate;
+
+	/*
+	 * Return next tuple from the current sorted group set if available.
+	 * If there are no more tuples in the current group, we need to try
+	 * to fetch more tuples from the input and build another group.
+	 */
+	if (node->sort_Done)
+	{
+		slot = node->ss.ps.ps_ResultTupleSlot;
+		if (tuplesort_gettupleslot(tuplesortstate,
+									  ScanDirectionIsForward(dir),
+									  false, slot, NULL) || node->finished)
+			return slot;
+	}
+
+	/*
+	 * First time through or no tuples in the current group. Read next
+	 * batch of tuples from the outer plan and pass them to tuplesort.c.
+	 * Subsequent calls just fetch tuples from tuplesort, until the group
+	 * is exhausted, at which point we build the next group.
+	 */
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "sorting subplan");
+
+	/*
+	 * Want to scan subplan in the forward direction while creating the
+	 * sorted data.
+	 */
+	estate->es_direction = ForwardScanDirection;
+
+	outerNode = outerPlanState(node);
+	tupDesc = ExecGetResultType(outerNode);
+
+	/*
+	 * Initialize tuplesort module (needed only before the first group).
+	 */
+	if (node->tuplesortstate == NULL)
+	{
+		/*
+		 * We are going to process the first group of presorted data.
+		 * Initialize support structures for cmpSortPresortedCols - already
+		 * sorted columns.
+		 */
+		preparePresortedCols(node);
+
+		SO1_printf("ExecIncrementalSort: %s\n",
+				   "calling tuplesort_begin_heap");
+
+		/*
+		 * Pass all the columns to tuplesort.  We pass to tuple sort groups
+		 * of at least minGroupSize size.  Thus, these groups doesn't
+		 * necessary have equal value of the first column.
+		 */
+		tuplesortstate = tuplesort_begin_heap(
+									tupDesc,
+									plannode->sort.numCols,
+									plannode->sort.sortColIdx,
+									plannode->sort.sortOperators,
+									plannode->sort.collations,
+									plannode->sort.nullsFirst,
+									work_mem,
+									NULL,
+									false);
+		node->tuplesortstate = (void *) tuplesortstate;
+	}
+	else
+	{
+		/* Next group of presorted data */
+		tuplesort_reset((Tuplesortstate *) node->tuplesortstate);
+	}
+	node->group_count++;
+
+	/*
+	 * Calculate remaining bound for bounded sort and minimal group size
+	 * accordingly.
+	 */
+	if (node->bounded)
+	{
+		tuplesort_set_bound(tuplesortstate, node->bound - node->bound_Done);
+		minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, node->bound - node->bound_Done);
+	}
+	else
+	{
+		minGroupSize = DEFAULT_MIN_GROUP_SIZE;
+	}
+
+	/* If we got a leftover tuple from the last group, pass it to tuplesort. */
+	if (!TupIsNull(node->group_pivot))
+	{
+		tuplesort_puttupleslot(tuplesortstate, node->group_pivot);
+		ExecClearTuple(node->group_pivot);
+		nTuples++;
+	}
+
+	/*
+	 * Put next group of tuples where presortedCols sort values are equal to
+	 * tuplesort.
+	 */
+	for (;;)
+	{
+		slot = ExecProcNode(outerNode);
+
+		if (TupIsNull(slot))
+		{
+			node->finished = true;
+			break;
+		}
+
+		/*
+		 * Accumulate the next group of presorted tuples for tuplesort.
+		 * We always accumulate at least minGroupSize tuples, and only
+		 * then we start to compare the prefix keys.
+		 *
+		 * The last tuple is kept as a pivot, so that we can determine if
+		 * the subsequent tuples have the same prefix key (same group).
+		 */
+		if (nTuples < minGroupSize)
+		{
+			tuplesort_puttupleslot(tuplesortstate, slot);
+
+			/* Keep the last tuple in minimal group as a pivot. */
+			if (nTuples == minGroupSize - 1)
+				ExecCopySlot(node->group_pivot, slot);
+			nTuples++;
+		}
+		else
+		{
+			/*
+			 * Iterate while presorted cols are the same as in the pivot
+			 * tuple.
+			 *
+			 * After accumulating at least minGroupSize tuples (we don't
+			 * know how many groups are there in that set), we need to keep
+			 * accumulating until we reach the end of the group. Only then
+			 * we can do the sort and output all the tuples.
+			 *
+			 * We compare the prefix keys to the pivot - if the prefix keys
+			 * are the same the tuple belongs to the same group, so we pass
+			 * it to the tuplesort.
+			 *
+			 * If the prefix differs, we've reached the end of the group. We
+			 * need to keep the last tuple, so we copy it into the pivot slot
+			 * (it does not serve as pivot, though).
+			 */
+			if (isCurrentGroup(node, slot))
+			{
+				tuplesort_puttupleslot(tuplesortstate, slot);
+				nTuples++;
+			}
+			else
+			{
+				ExecCopySlot(node->group_pivot, slot);
+				break;
+			}
+		}
+	}
+
+	/*
+	 * Complete the sort.
+	 */
+	tuplesort_performsort(tuplesortstate);
+
+	/*
+	 * restore to user specified direction
+	 */
+	estate->es_direction = dir;
+
+	/*
+	 * finally set the sorted flag to true
+	 */
+	node->sort_Done = true;
+	node->bounded_Done = node->bounded;
+	if (node->shared_info && node->am_worker)
+	{
+		TuplesortInstrumentation *si;
+
+		Assert(IsParallelWorker());
+		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+		si = &node->shared_info->sinfo[ParallelWorkerNumber].sinstrument;
+		tuplesort_get_stats(tuplesortstate, si);
+		node->shared_info->sinfo[ParallelWorkerNumber].group_count =
+															node->group_count;
+	}
+
+	/*
+	 * Adjust bound_Done with number of tuples we've actually sorted.
+	 */
+	if (node->bounded)
+	{
+		if (node->finished)
+			node->bound_Done = node->bound;
+		else
+			node->bound_Done = Min(node->bound, node->bound_Done + nTuples);
+	}
+
+	SO1_printf("ExecIncrementalSort: %s\n", "sorting done");
+
+	SO1_printf("ExecIncrementalSort: %s\n",
+			   "retrieving tuple from tuplesort");
+
+	/*
+	 * Get the first or next tuple from tuplesort. Returns NULL if no more
+	 * tuples.
+	 */
+	slot = node->ss.ps.ps_ResultTupleSlot;
+	(void) tuplesort_gettupleslot(tuplesortstate,
+								  ScanDirectionIsForward(dir),
+								  false, slot, NULL);
+	return slot;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecInitIncrementalSort
+ *
+ *		Creates the run-time state information for the sort node
+ *		produced by the planner and initializes its outer subtree.
+ * ----------------------------------------------------------------
+ */
+IncrementalSortState *
+ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags)
+{
+	IncrementalSortState   *incrsortstate;
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "initializing sort node");
+
+	/*
+	 * Incremental sort can't be used with either EXEC_FLAG_REWIND,
+	 * EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK, because we hold only current
+	 * bucket in tuplesortstate.
+	 */
+	Assert((eflags & (EXEC_FLAG_REWIND |
+					  EXEC_FLAG_BACKWARD |
+					  EXEC_FLAG_MARK)) == 0);
+
+	/*
+	 * create state structure
+	 */
+	incrsortstate = makeNode(IncrementalSortState);
+	incrsortstate->ss.ps.plan = (Plan *) node;
+	incrsortstate->ss.ps.state = estate;
+	incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort;
+
+	incrsortstate->bounded = false;
+	incrsortstate->sort_Done = false;
+	incrsortstate->finished = false;
+	incrsortstate->tuplesortstate = NULL;
+	incrsortstate->group_pivot = NULL;
+	incrsortstate->bound_Done = 0;
+	incrsortstate->group_count = 0;
+	incrsortstate->presorted_keys = NULL;
+
+	/*
+	 * Miscellaneous initialization
+	 *
+	 * Sort nodes don't initialize their ExprContexts because they never call
+	 * ExecQual or ExecProject.
+	 */
+
+	/*
+	 * initialize child nodes
+	 *
+	 * We shield the child node from the need to support REWIND, BACKWARD, or
+	 * MARK/RESTORE.
+	 */
+	eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK);
+
+	outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags);
+
+	/*
+	 * Initialize scan slot and type.
+	 */
+	ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss);
+
+	/*
+	 * Initialize return slot and type. No need to initialize projection info because
+	 * this node doesn't do projections.
+	 */
+	ExecInitResultTupleSlotTL(estate, &incrsortstate->ss.ps);
+	incrsortstate->ss.ps.ps_ProjInfo = NULL;
+
+	/* make standalone slot to store previous tuple from outer node */
+	incrsortstate->group_pivot = MakeSingleTupleTableSlot(
+							ExecGetResultType(outerPlanState(incrsortstate)));
+
+	SO1_printf("ExecInitIncrementalSort: %s\n",
+			   "sort node initialized");
+
+	return incrsortstate;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecEndIncrementalSort(node)
+ * ----------------------------------------------------------------
+ */
+void
+ExecEndIncrementalSort(IncrementalSortState *node)
+{
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "shutting down sort node");
+
+	/*
+	 * clean out the tuple table
+	 */
+	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+	/* must drop stanalone tuple slot from outer node */
+	ExecDropSingleTupleTableSlot(node->group_pivot);
+
+	/*
+	 * Release tuplesort resources
+	 */
+	if (node->tuplesortstate != NULL)
+		tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+
+	/*
+	 * shut down the subplan
+	 */
+	ExecEndNode(outerPlanState(node));
+
+	SO1_printf("ExecEndIncrementalSort: %s\n",
+			   "sort node shutdown");
+}
+
+void
+ExecReScanIncrementalSort(IncrementalSortState *node)
+{
+	PlanState  *outerPlan = outerPlanState(node);
+
+	/*
+	 * If we haven't sorted yet, just return. If outerplan's chgParam is not
+	 * NULL then it will be re-scanned by ExecProcNode, else no reason to
+	 * re-scan it at all.
+	 */
+	if (!node->sort_Done)
+		return;
+
+	/* must drop pointer to sort result tuple */
+	ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
+
+	/*
+	 * If subnode is to be rescanned then we forget previous sort results; we
+	 * have to re-read the subplan and re-sort.  Also must re-sort if the
+	 * bounded-sort parameters changed or we didn't select randomAccess.
+	 *
+	 * Otherwise we can just rewind and rescan the sorted output.
+	 */
+	node->sort_Done = false;
+	tuplesort_end((Tuplesortstate *) node->tuplesortstate);
+	node->tuplesortstate = NULL;
+	node->bound_Done = 0;
+
+	/*
+	 * if chgParam of subnode is not null then plan will be re-scanned by
+	 * first ExecProcNode.
+	 */
+	if (outerPlan->chgParam == NULL)
+		ExecReScan(outerPlan);
+}
+
+/* ----------------------------------------------------------------
+ *						Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *		ExecSortEstimate
+ *
+ *		Estimate space required to propagate sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo));
+	size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo));
+	shm_toc_estimate_chunk(&pcxt->estimator, size);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeDSM
+ *
+ *		Initialize DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt)
+{
+	Size		size;
+
+	/* don't need this if not instrumenting or no workers */
+	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ pcxt->nworkers * sizeof(IncrementalSortInfo);
+	node->shared_info = shm_toc_allocate(pcxt->toc, size);
+	/* ensure any unfilled slots will contain zeroes */
+	memset(node->shared_info, 0, size);
+	node->shared_info->num_workers = pcxt->nworkers;
+	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+				   node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortInitializeWorker
+ *
+ *		Attach worker to DSM space for sort statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt)
+{
+	node->shared_info =
+		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+	node->am_worker = true;
+}
+
+/* ----------------------------------------------------------------
+ *		ExecSortRetrieveInstrumentation
+ *
+ *		Transfer sort statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node)
+{
+	Size		size;
+	SharedIncrementalSortInfo *si;
+
+	if (node->shared_info == NULL)
+		return;
+
+	size = offsetof(SharedIncrementalSortInfo, sinfo)
+		+ node->shared_info->num_workers * sizeof(IncrementalSortInfo);
+	si = palloc(size);
+	memcpy(si, node->shared_info, size);
+	node->shared_info = si;
+}
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 73f16c9aba..bdab33f5c4 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -93,7 +93,8 @@ ExecSort(PlanState *pstate)
 											  plannode->collations,
 											  plannode->nullsFirst,
 											  work_mem,
-											  NULL, node->randomAccess);
+											  NULL,
+											  node->randomAccess);
 		if (node->bounded)
 			tuplesort_set_bound(tuplesortstate, node->bound);
 		node->tuplesortstate = (void *) tuplesortstate;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 9287baaedc..9b117f7f05 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -924,6 +924,24 @@ _copyMaterial(const Material *from)
 }
 
 
+/*
+ * CopySortFields
+ *
+ *		This function copies the fields of the Sort node.  It is used by
+ *		all the copy functions for classes which inherit from Sort.
+ */
+static void
+CopySortFields(const Sort *from, Sort *newnode)
+{
+	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+
+	COPY_SCALAR_FIELD(numCols);
+	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
+	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
+	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+}
+
 /*
  * _copySort
  */
@@ -935,13 +953,29 @@ _copySort(const Sort *from)
 	/*
 	 * copy node superclass fields
 	 */
-	CopyPlanFields((const Plan *) from, (Plan *) newnode);
+	CopySortFields(from, newnode);
 
-	COPY_SCALAR_FIELD(numCols);
-	COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
-	COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
-	COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
+	return newnode;
+}
+
+
+/*
+ * _copyIncrementalSort
+ */
+static IncrementalSort *
+_copyIncrementalSort(const IncrementalSort *from)
+{
+	IncrementalSort	   *newnode = makeNode(IncrementalSort);
+
+	/*
+	 * copy node superclass fields
+	 */
+	CopySortFields((const Sort *) from, (Sort *) newnode);
+
+	/*
+	 * copy remainder of node
+	 */
+	COPY_SCALAR_FIELD(presortedCols);
 
 	return newnode;
 }
@@ -4900,6 +4934,9 @@ copyObjectImpl(const void *from)
 		case T_Sort:
 			retval = _copySort(from);
 			break;
+		case T_IncrementalSort:
+			retval = _copyIncrementalSort(from);
+			break;
 		case T_Group:
 			retval = _copyGroup(from);
 			break;
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 03a91c3352..51d0e3008c 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -894,12 +894,10 @@ _outMaterial(StringInfo str, const Material *node)
 }
 
 static void
-_outSort(StringInfo str, const Sort *node)
+_outSortInfo(StringInfo str, const Sort *node)
 {
 	int			i;
 
-	WRITE_NODE_TYPE("SORT");
-
 	_outPlanInfo(str, (const Plan *) node);
 
 	WRITE_INT_FIELD(numCols);
@@ -921,6 +919,24 @@ _outSort(StringInfo str, const Sort *node)
 		appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
 }
 
+static void
+_outSort(StringInfo str, const Sort *node)
+{
+	WRITE_NODE_TYPE("SORT");
+
+	_outSortInfo(str, node);
+}
+
+static void
+_outIncrementalSort(StringInfo str, const IncrementalSort *node)
+{
+	WRITE_NODE_TYPE("INCREMENTALSORT");
+
+	_outSortInfo(str, (const Sort *) node);
+
+	WRITE_INT_FIELD(presortedCols);
+}
+
 static void
 _outUnique(StringInfo str, const Unique *node)
 {
@@ -3806,6 +3822,9 @@ outNode(StringInfo str, const void *obj)
 			case T_Sort:
 				_outSort(str, obj);
 				break;
+			case T_IncrementalSort:
+				_outIncrementalSort(str, obj);
+				break;
 			case T_Unique:
 				_outUnique(str, obj);
 				break;
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 2812dc9646..ee730bd52d 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2134,12 +2134,13 @@ _readMaterial(void)
 }
 
 /*
- * _readSort
+ * ReadCommonSort
+ *	Assign the basic stuff of all nodes that inherit from Sort
  */
-static Sort *
-_readSort(void)
+static void
+ReadCommonSort(Sort *local_node)
 {
-	READ_LOCALS(Sort);
+	READ_TEMP_LOCALS();
 
 	ReadCommonPlan(&local_node->plan);
 
@@ -2148,6 +2149,32 @@ _readSort(void)
 	READ_OID_ARRAY(sortOperators, local_node->numCols);
 	READ_OID_ARRAY(collations, local_node->numCols);
 	READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
+}
+
+/*
+ * _readSort
+ */
+static Sort *
+_readSort(void)
+{
+	READ_LOCALS_NO_FIELDS(Sort);
+
+	ReadCommonSort(local_node);
+
+	READ_DONE();
+}
+
+/*
+ * _readIncrementalSort
+ */
+static IncrementalSort *
+_readIncrementalSort(void)
+{
+	READ_LOCALS(IncrementalSort);
+
+	ReadCommonSort(&local_node->sort);
+
+	READ_INT_FIELD(presortedCols);
 
 	READ_DONE();
 }
@@ -2723,6 +2750,8 @@ parseNodeString(void)
 		return_value = _readMaterial();
 	else if (MATCH("SORT", 4))
 		return_value = _readSort();
+	else if (MATCH("INCREMENTALSORT", 15))
+		return_value = _readIncrementalSort();
 	else if (MATCH("GROUP", 5))
 		return_value = _readGroup();
 	else if (MATCH("AGG", 3))
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 65a34a255d..b13f7a68ba 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -3713,6 +3713,10 @@ print_path(PlannerInfo *root, Path *path, int indent)
 			ptype = "Sort";
 			subpath = ((SortPath *) path)->subpath;
 			break;
+		case T_IncrementalSortPath:
+			ptype = "IncrementalSort";
+			subpath = ((SortPath *) path)->subpath;
+			break;
 		case T_GroupPath:
 			ptype = "Group";
 			subpath = ((GroupPath *) path)->subpath;
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 47729de896..f6d4bec556 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -128,6 +128,7 @@ bool		enable_indexonlyscan = true;
 bool		enable_bitmapscan = true;
 bool		enable_tidscan = true;
 bool		enable_sort = true;
+bool		enable_incrementalsort = true;
 bool		enable_hashagg = true;
 bool		enable_nestloop = true;
 bool		enable_material = true;
@@ -1611,9 +1612,9 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
 }
 
 /*
- * cost_sort
- *	  Determines and returns the cost of sorting a relation, including
- *	  the cost of reading the input data.
+ * cost_tuplesort
+ *	  Determines and returns the cost of sorting a relation using tuplesort,
+ *    not including the cost of reading the input data.
  *
  * If the total volume of data to sort is less than sort_mem, we will do
  * an in-memory sort, which requires no I/O and about t*log2(t) tuple
@@ -1640,39 +1641,23 @@ cost_recursive_union(Path *runion, Path *nrterm, Path *rterm)
  * specifying nonzero comparison_cost; typically that's used for any extra
  * work that has to be done to prepare the inputs to the comparison operators.
  *
- * 'pathkeys' is a list of sort keys
- * 'input_cost' is the total cost for reading the input data
  * 'tuples' is the number of tuples in the relation
  * 'width' is the average tuple width in bytes
  * 'comparison_cost' is the extra cost per comparison, if any
  * 'sort_mem' is the number of kilobytes of work memory allowed for the sort
  * 'limit_tuples' is the bound on the number of output tuples; -1 if no bound
- *
- * NOTE: some callers currently pass NIL for pathkeys because they
- * can't conveniently supply the sort keys.  Since this routine doesn't
- * currently do anything with pathkeys anyway, that doesn't matter...
- * but if it ever does, it should react gracefully to lack of key data.
- * (Actually, the thing we'd most likely be interested in is just the number
- * of sort keys, which all callers *could* supply.)
  */
-void
-cost_sort(Path *path, PlannerInfo *root,
-		  List *pathkeys, Cost input_cost, double tuples, int width,
+static void
+cost_tuplesort(Cost *startup_cost, Cost *run_cost,
+		  double tuples, int width,
 		  Cost comparison_cost, int sort_mem,
 		  double limit_tuples)
 {
-	Cost		startup_cost = input_cost;
-	Cost		run_cost = 0;
 	double		input_bytes = relation_byte_size(tuples, width);
 	double		output_bytes;
 	double		output_tuples;
 	long		sort_mem_bytes = sort_mem * 1024L;
 
-	if (!enable_sort)
-		startup_cost += disable_cost;
-
-	path->rows = tuples;
-
 	/*
 	 * We want to be sure the cost of a sort is never estimated as zero, even
 	 * if passed-in tuple count is zero.  Besides, mustn't do log(0)...
@@ -1711,7 +1696,7 @@ cost_sort(Path *path, PlannerInfo *root,
 		 *
 		 * Assume about N log2 N comparisons
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		*startup_cost = comparison_cost * tuples * LOG2(tuples);
 
 		/* Disk costs */
 
@@ -1722,7 +1707,7 @@ cost_sort(Path *path, PlannerInfo *root,
 			log_runs = 1.0;
 		npageaccesses = 2.0 * npages * log_runs;
 		/* Assume 3/4ths of accesses are sequential, 1/4th are not */
-		startup_cost += npageaccesses *
+		*startup_cost += npageaccesses *
 			(seq_page_cost * 0.75 + random_page_cost * 0.25);
 	}
 	else if (tuples > 2 * output_tuples || input_bytes > sort_mem_bytes)
@@ -1733,12 +1718,12 @@ cost_sort(Path *path, PlannerInfo *root,
 		 * factor is a bit higher than for quicksort.  Tweak it so that the
 		 * cost curve is continuous at the crossover point.
 		 */
-		startup_cost += comparison_cost * tuples * LOG2(2.0 * output_tuples);
+		*startup_cost = comparison_cost * tuples * LOG2(2.0 * output_tuples);
 	}
 	else
 	{
 		/* We'll use plain quicksort on all the input tuples */
-		startup_cost += comparison_cost * tuples * LOG2(tuples);
+		*startup_cost = comparison_cost * tuples * LOG2(tuples);
 	}
 
 	/*
@@ -1749,8 +1734,183 @@ cost_sort(Path *path, PlannerInfo *root,
 	 * here --- the upper LIMIT will pro-rate the run cost so we'd be double
 	 * counting the LIMIT otherwise.
 	 */
-	run_cost += cpu_operator_cost * tuples;
+	*run_cost = cpu_operator_cost * tuples;
+}
 
+/*
+ * cost_full_sort
+ * 	Determines and returns the cost of sorting a relation, including the
+ *	cost of reading the input data.
+ *
+ * For the precise description of how the cost is calculated, see the comment
+ * for cost_tuplesort().
+ */
+void
+cost_full_sort(Cost *startup_cost, Cost *run_cost,
+			   Cost input_total_cost, double tuples, int width,
+			   Cost comparison_cost, int sort_mem,
+			   double limit_tuples)
+{
+	cost_tuplesort(startup_cost, run_cost,
+				   tuples, width,
+				   comparison_cost, sort_mem,
+				   limit_tuples);
+
+	if (!enable_sort)
+		*startup_cost += disable_cost;
+
+	*startup_cost += input_total_cost;
+}
+
+/*
+ * cost_incremental_sort
+ * 	Determines and returns the cost of sorting a relation incrementally, when
+ *  the input path is already sorted by some of the pathkeys.
+ *
+ * 'presorted_keys' is the number of leading pathkeys by which the input path
+ * is sorted.
+ *
+ * We estimate the number of groups into which the relation is divided by the
+ * leading pathkeys, and then calculate the cost of sorting a single group
+ * with tuplesort using cost_tuplesort().
+ */
+void
+cost_incremental_sort(Path *path,
+		  PlannerInfo *root, List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double input_tuples, int width, Cost comparison_cost, int sort_mem,
+		  double limit_tuples)
+{
+	Cost		startup_cost = 0,
+				run_cost = 0,
+				input_run_cost = input_total_cost - input_startup_cost;
+	double		output_tuples,
+				output_groups,
+				group_tuples,
+				input_groups;
+	Cost		group_startup_cost,
+				group_run_cost,
+				group_input_run_cost;
+	List	   *presortedExprs = NIL;
+	ListCell   *l;
+	int			i = 0;
+
+	Assert(presorted_keys != 0);
+
+	if (!enable_sort)
+		startup_cost += disable_cost;
+
+	if (!enable_incrementalsort)
+		startup_cost += disable_cost;
+
+	/*
+	 * We want to be sure the cost of a sort is never estimated as zero, even
+	 * if passed-in tuple count is zero.  Besides, mustn't do log(0)...
+	 */
+	if (input_tuples < 2.0)
+		input_tuples = 2.0;
+
+	/* Extract presorted keys as list of expressions */
+	foreach(l, pathkeys)
+	{
+		PathKey *key = (PathKey *)lfirst(l);
+		EquivalenceMember *member = (EquivalenceMember *)
+						linitial(key->pk_eclass->ec_members);
+
+		presortedExprs = lappend(presortedExprs, member->em_expr);
+
+		i++;
+		if (i >= presorted_keys)
+			break;
+	}
+
+	/* Estimate number of groups with equal presorted keys */
+	input_groups = estimate_num_groups(root, presortedExprs, input_tuples, NULL);
+	group_tuples = input_tuples / input_groups;
+	group_input_run_cost = input_run_cost / input_groups;
+
+	/*
+	 * Estimate average cost of sorting of one group where presorted keys
+	 * are equal.  Incremental sort is sensitive to distribution of tuples
+	 * to the groups, where we're relying on quite rough assumptions.  Thus,
+	 * we're pessimistic about incremental sort performance and increase
+	 * its average group size by half.
+	 */
+	cost_tuplesort(&group_startup_cost, &group_run_cost,
+				   1.5 * group_tuples, width, comparison_cost, sort_mem,
+				   limit_tuples);
+
+	/* If we have a LIMIT, adjust the number of groups we'll have to return. */
+	if (limit_tuples > 0 && limit_tuples < input_tuples)
+	{
+		output_tuples = limit_tuples;
+		output_groups = floor(output_tuples / group_tuples) + 1;
+	}
+	else
+	{
+		output_tuples = input_tuples;
+		output_groups = input_groups;
+	}
+
+	/*
+	 * Startup cost of incremental sort is the startup cost of its first group
+	 * plus the cost of its input.
+	 */
+	startup_cost += group_startup_cost
+		+ input_startup_cost + group_input_run_cost;
+
+	/*
+	 * After we started producing tuples from the first group, the cost of
+	 * producing all the tuples is given by the cost to finish processing
+	 * this group, plus the total cost to process the remaining groups,
+	 * plus the remaining cost of input.
+	 */
+	run_cost += group_run_cost
+		+ (group_run_cost + group_startup_cost) * (output_groups - 1)
+		+ group_input_run_cost * (output_groups - 1);
+
+	/*
+	 * Incremental sort adds some overhead by itself. Firstly, it has to
+	 * detect the sort groups. This is roughly equal to one extra copy and
+	 * comparison per tuple. Secondly, it has to reset the tuplesort context
+	 * for every group.
+	 */
+	run_cost += (cpu_tuple_cost + comparison_cost) * output_tuples;
+	run_cost += 2.0 * cpu_tuple_cost * output_groups;
+
+	path->rows = input_tuples;
+	path->startup_cost = startup_cost;
+	path->total_cost = startup_cost + run_cost;
+}
+
+/*
+ * cost_sort
+ *	  Determines and returns the cost of sorting a relation, including
+ *	  the cost of reading the input data.
+ *
+ * NOTE: some callers currently pass NIL for pathkeys because they
+ * can't conveniently supply the sort keys.  Since this routine doesn't
+ * currently do anything with pathkeys anyway, that doesn't matter...
+ * but if it ever does, it should react gracefully to lack of key data.
+ * (Actually, the thing we'd most likely be interested in is just the number
+ * of sort keys, which all callers *could* supply.)
+ */
+void
+cost_sort(Path *path, PlannerInfo *root,
+		  List *pathkeys, Cost input_cost, double tuples, int width,
+		  Cost comparison_cost, int sort_mem,
+		  double limit_tuples)
+
+{
+	Cost startup_cost;
+	Cost run_cost;
+
+	cost_full_sort(&startup_cost, &run_cost,
+				   input_cost,
+				   tuples, width, comparison_cost, sort_mem,
+				   limit_tuples);
+
+	path->rows = tuples;
 	path->startup_cost = startup_cost;
 	path->total_cost = startup_cost + run_cost;
 }
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index 6d1cc3b8a0..6b2ba366c9 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -327,6 +327,51 @@ pathkeys_contained_in(List *keys1, List *keys2)
 	return false;
 }
 
+
+/*
+ * pathkeys_common_contained_in
+ *    Same as pathkeys_contained_in, but also sets length of longest
+ *    common prefix of keys1 and keys2.
+ */
+bool
+pathkeys_common_contained_in(List *keys1, List *keys2, int *n_common)
+{
+	int			n = 0;
+	ListCell   *key1,
+			   *key2;
+
+	forboth(key1, keys1, key2, keys2)
+	{
+		PathKey    *pathkey1 = (PathKey *) lfirst(key1);
+		PathKey    *pathkey2 = (PathKey *) lfirst(key2);
+
+		if (pathkey1 != pathkey2)
+		{
+			*n_common = n;
+			return false;
+		}
+		n++;
+	}
+
+	*n_common = n;
+	return (key1 == NULL); 
+}
+
+
+/*
+ * pathkeys_common
+ *    Returns length of longest common prefix of keys1 and keys2.
+ */
+int
+pathkeys_common(List *keys1, List *keys2)
+{
+	int		n;
+
+	(void) pathkeys_common_contained_in(keys1, keys2, &n);
+	return n;
+}
+
+
 /*
  * get_cheapest_path_for_pathkeys
  *	  Find the cheapest path (according to the specified criterion) that
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 99d0736029..34b2417c4c 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -96,6 +96,8 @@ static Plan *create_projection_plan(PlannerInfo *root,
 					   int flags);
 static Plan *inject_projection_plan(Plan *subplan, List *tlist, bool parallel_safe);
 static Sort *create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags);
+static IncrementalSort *create_incrementalsort_plan(PlannerInfo *root,
+									IncrementalSortPath *best_path, int flags);
 static Group *create_group_plan(PlannerInfo *root, GroupPath *best_path);
 static Unique *create_upper_unique_plan(PlannerInfo *root, UpperUniquePath *best_path,
 						 int flags);
@@ -245,6 +247,10 @@ static MergeJoin *make_mergejoin(List *tlist,
 static Sort *make_sort(Plan *lefttree, int numCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst);
+static IncrementalSort *make_incrementalsort(Plan *lefttree,
+		  int numCols, int presortedCols,
+		  AttrNumber *sortColIdx, Oid *sortOperators,
+		  Oid *collations, bool *nullsFirst);
 static Plan *prepare_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
 						   Relids relids,
 						   const AttrNumber *reqColIdx,
@@ -259,6 +265,8 @@ static EquivalenceMember *find_ec_member_for_tle(EquivalenceClass *ec,
 					   Relids relids);
 static Sort *make_sort_from_pathkeys(Plan *lefttree, List *pathkeys,
 						Relids relids);
+static IncrementalSort *make_incrementalsort_from_pathkeys(Plan *lefttree,
+						List *pathkeys, Relids relids, int presortedCols);
 static Sort *make_sort_from_groupcols(List *groupcls,
 						 AttrNumber *grpColIdx,
 						 Plan *lefttree);
@@ -458,6 +466,11 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
 											 (SortPath *) best_path,
 											 flags);
 			break;
+		case T_IncrementalSort:
+			plan = (Plan *) create_incrementalsort_plan(root,
+														(IncrementalSortPath *) best_path,
+														flags);
+			break;
 		case T_Group:
 			plan = (Plan *) create_group_plan(root,
 											  (GroupPath *) best_path);
@@ -1741,6 +1754,32 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 	return plan;
 }
 
+/*
+ * create_incrementalsort_plan
+ *
+ *	  Do the same as create_sort_plan, but create IncrementalSort plan.
+ */
+static IncrementalSort *
+create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path,
+							int flags)
+{
+	IncrementalSort	   *plan;
+	Plan			   *subplan;
+
+	/* See comments in create_sort_plan() above */
+	subplan = create_plan_recurse(root, best_path->spath.subpath,
+								  flags | CP_SMALL_TLIST);
+	plan = make_incrementalsort_from_pathkeys(subplan,
+								best_path->spath.path.pathkeys,
+								IS_OTHER_REL(best_path->spath.subpath->parent) ?
+								best_path->spath.path.parent->relids : NULL,
+								best_path->presortedCols);
+
+	copy_generic_path_info(&plan->sort.plan, (Path *) best_path);
+
+	return plan;
+}
+
 /*
  * create_group_plan
  *
@@ -5000,17 +5039,24 @@ static void
 label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
 {
 	Plan	   *lefttree = plan->plan.lefttree;
-	Path		sort_path;		/* dummy for result of cost_sort */
+	Cost		startup_cost,
+				run_cost;
+
+	/*
+	 * This function shouldn't have to deal with IncrementalSort plans
+	 * because they are only created from corresponding Path nodes.
+	 */
+	Assert(IsA(plan, Sort));
 
-	cost_sort(&sort_path, root, NIL,
+	cost_full_sort(&startup_cost, &run_cost,
 			  lefttree->total_cost,
 			  lefttree->plan_rows,
 			  lefttree->plan_width,
 			  0.0,
 			  work_mem,
 			  limit_tuples);
-	plan->plan.startup_cost = sort_path.startup_cost;
-	plan->plan.total_cost = sort_path.total_cost;
+	plan->plan.startup_cost = startup_cost;
+	plan->plan.total_cost = startup_cost + run_cost;
 	plan->plan.plan_rows = lefttree->plan_rows;
 	plan->plan.plan_width = lefttree->plan_width;
 	plan->plan.parallel_aware = false;
@@ -5597,9 +5643,12 @@ make_sort(Plan *lefttree, int numCols,
 		  AttrNumber *sortColIdx, Oid *sortOperators,
 		  Oid *collations, bool *nullsFirst)
 {
-	Sort	   *node = makeNode(Sort);
-	Plan	   *plan = &node->plan;
+	Sort	   *node;
+	Plan	   *plan;
 
+	node = makeNode(Sort);
+
+	plan = &node->plan;
 	plan->targetlist = lefttree->targetlist;
 	plan->qual = NIL;
 	plan->lefttree = lefttree;
@@ -5613,6 +5662,37 @@ make_sort(Plan *lefttree, int numCols,
 	return node;
 }
 
+/*
+ * make_incrementalsort --- basic routine to build a IncrementalSort plan node
+ *
+ * Caller must have built the sortColIdx, sortOperators, collations, and
+ * nullsFirst arrays already.
+ */
+static IncrementalSort *
+make_incrementalsort(Plan *lefttree, int numCols, int presortedCols,
+		  AttrNumber *sortColIdx, Oid *sortOperators,
+		  Oid *collations, bool *nullsFirst)
+{
+	IncrementalSort	   *node;
+	Plan			   *plan;
+
+	node = makeNode(IncrementalSort);
+
+	plan = &node->sort.plan;
+	plan->targetlist = lefttree->targetlist;
+	plan->qual = NIL;
+	plan->lefttree = lefttree;
+	plan->righttree = NULL;
+	node->presortedCols = presortedCols;
+	node->sort.numCols = numCols;
+	node->sort.sortColIdx = sortColIdx;
+	node->sort.sortOperators = sortOperators;
+	node->sort.collations = collations;
+	node->sort.nullsFirst = nullsFirst;
+
+	return node;
+}
+
 /*
  * prepare_sort_from_pathkeys
  *	  Prepare to sort according to given pathkeys
@@ -5959,6 +6039,42 @@ make_sort_from_pathkeys(Plan *lefttree, List *pathkeys, Relids relids)
 					 collations, nullsFirst);
 }
 
+/*
+ * make_incrementalsort_from_pathkeys
+ *	  Create sort plan to sort according to given pathkeys
+ *
+ *	  'lefttree' is the node which yields input tuples
+ *	  'pathkeys' is the list of pathkeys by which the result is to be sorted
+ *	  'relids' is the set of relations required by prepare_sort_from_pathkeys()
+ *	  'presortedCols' is the number of presorted columns in input tuples
+ */
+static IncrementalSort *
+make_incrementalsort_from_pathkeys(Plan *lefttree, List *pathkeys,
+						Relids relids, int presortedCols)
+{
+	int			numsortkeys;
+	AttrNumber *sortColIdx;
+	Oid		   *sortOperators;
+	Oid		   *collations;
+	bool	   *nullsFirst;
+
+	/* Compute sort column info, and adjust lefttree as needed */
+	lefttree = prepare_sort_from_pathkeys(lefttree, pathkeys,
+										  relids,
+										  NULL,
+										  false,
+										  &numsortkeys,
+										  &sortColIdx,
+										  &sortOperators,
+										  &collations,
+										  &nullsFirst);
+
+	/* Now build the Sort node */
+	return make_incrementalsort(lefttree, numsortkeys, presortedCols,
+								sortColIdx, sortOperators,
+								collations, nullsFirst);
+}
+
 /*
  * make_sort_from_sortclauses
  *	  Create sort plan to sort according to given sortclauses
@@ -6723,6 +6839,7 @@ is_projection_capable_plan(Plan *plan)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_LockRows:
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 008492bad5..b37c4ff933 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -4840,8 +4840,8 @@ create_distinct_paths(PlannerInfo *root,
  * Build a new upperrel containing Paths for ORDER BY evaluation.
  *
  * All paths in the result must satisfy the ORDER BY ordering.
- * The only new path we need consider is an explicit sort on the
- * cheapest-total existing path.
+ * The only new paths we need consider is an explicit full or
+ * incremental sort on the cheapest-total existing path.
  *
  * input_rel: contains the source-data Paths
  * target: the output tlist the result Paths must emit
@@ -4880,29 +4880,58 @@ create_ordered_paths(PlannerInfo *root,
 
 	foreach(lc, input_rel->pathlist)
 	{
-		Path	   *path = (Path *) lfirst(lc);
+		Path	   *input_path = (Path *) lfirst(lc);
+		Path	   *sorted_path = input_path;
 		bool		is_sorted;
+		int			presorted_keys;
+
+		is_sorted = pathkeys_common_contained_in(root->sort_pathkeys,
+												 input_path->pathkeys, &presorted_keys);
 
-		is_sorted = pathkeys_contained_in(root->sort_pathkeys,
-										  path->pathkeys);
-		if (path == cheapest_input_path || is_sorted)
+		if (is_sorted)
 		{
-			if (!is_sorted)
-			{
-				/* An explicit sort here can take advantage of LIMIT */
-				path = (Path *) create_sort_path(root,
-												 ordered_rel,
-												 path,
-												 root->sort_pathkeys,
-												 limit_tuples);
-			}
+			/* Use the input path as is, but add a projection step if needed */
+			if (sorted_path->pathtarget != target)
+				sorted_path = apply_projection_to_path(root, ordered_rel,
+													   sorted_path, target);
 
+			add_path(ordered_rel, sorted_path);
+		}
+		else if (input_path == cheapest_input_path)
+		{
+			/*
+			 * Sort the cheapest input path. An explicit sort here can take
+			 * advantage of LIMIT.
+			 */
+			sorted_path = (Path *) create_sort_path(root,
+													ordered_rel,
+													input_path,
+													root->sort_pathkeys,
+													limit_tuples);
 			/* Add projection step if needed */
-			if (path->pathtarget != target)
-				path = apply_projection_to_path(root, ordered_rel,
-												path, target);
+			if (sorted_path->pathtarget != target)
+				sorted_path = apply_projection_to_path(root, ordered_rel,
+													   sorted_path, target);
 
-			add_path(ordered_rel, path);
+			add_path(ordered_rel, sorted_path);
+
+			/* Also consider incremental sort. */
+			if (presorted_keys > 0)
+			{
+				sorted_path = (Path *) create_incremental_sort_path(root,
+																	ordered_rel,
+																	input_path,
+																	root->sort_pathkeys,
+																	presorted_keys,
+																	limit_tuples);
+
+				/* Add projection step if needed */
+				if (sorted_path->pathtarget != target)
+					sorted_path = apply_projection_to_path(root, ordered_rel,
+														   sorted_path, target);
+
+				add_path(ordered_rel, sorted_path);
+			}
 		}
 	}
 
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index 833a92f538..af0b720067 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -642,6 +642,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index 83008d7661..313cad266f 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -2795,6 +2795,7 @@ finalize_plan(PlannerInfo *root, Plan *plan,
 		case T_Hash:
 		case T_Material:
 		case T_Sort:
+		case T_IncrementalSort:
 		case T_Unique:
 		case T_SetOp:
 		case T_Group:
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 416b3f9578..dfee78c43e 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -2593,6 +2593,57 @@ create_set_projection_path(PlannerInfo *root,
 	return pathnode;
 }
 
+/*
+ * create_incremental_sort_path
+ *	  Creates a pathnode that represents performing an incremental sort.
+ *
+ * 'rel' is the parent relation associated with the result
+ * 'subpath' is the path representing the source of data
+ * 'pathkeys' represents the desired sort order
+ * 'presorted_keys' is the number of keys by which the input path is
+ *		already sorted
+ * 'limit_tuples' is the estimated bound on the number of output tuples,
+ *		or -1 if no LIMIT or couldn't estimate
+ */
+SortPath *
+create_incremental_sort_path(PlannerInfo *root,
+				 RelOptInfo *rel,
+				 Path *subpath,
+				 List *pathkeys,
+				 int presorted_keys,
+				 double limit_tuples)
+{
+	IncrementalSortPath *sort = makeNode(IncrementalSortPath);
+	SortPath *pathnode = &sort->spath;
+
+	pathnode->path.pathtype = T_IncrementalSort;
+	pathnode->path.parent = rel;
+	/* Sort doesn't project, so use source path's pathtarget */
+	pathnode->path.pathtarget = subpath->pathtarget;
+	/* For now, assume we are above any joins, so no parameterization */
+	pathnode->path.param_info = NULL;
+	pathnode->path.parallel_aware = false;
+	pathnode->path.parallel_safe = rel->consider_parallel &&
+		subpath->parallel_safe;
+	pathnode->path.parallel_workers = subpath->parallel_workers;
+	pathnode->path.pathkeys = pathkeys;
+
+	pathnode->subpath = subpath;
+
+	cost_incremental_sort(&pathnode->path,
+			  root, pathkeys, presorted_keys,
+			  subpath->startup_cost,
+			  subpath->total_cost,
+			  subpath->rows,
+			  subpath->pathtarget->width,
+			  0.0,				/* XXX comparison_cost shouldn't be 0? */
+			  work_mem, limit_tuples);
+
+	sort->presortedCols = presorted_keys;
+
+	return pathnode;
+}
+
 /*
  * create_sort_path
  *	  Creates a pathnode that represents performing an explicit sort.
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 71c2b4eff1..060790198a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -873,6 +873,15 @@ static struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_incrementalsort", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of incremental sort steps."),
+			NULL
+		},
+		&enable_incrementalsort,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_hashagg", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables the planner's use of hashed aggregation plans."),
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index e433faad86..029c43b1d5 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -125,6 +125,15 @@
 #define PARALLEL_SORT(state)	((state)->shared == NULL ? 0 : \
 								 (state)->worker >= 0 ? 1 : 2)
 
+/*
+ * Initial size of memtuples array.  We're trying to select this size so that
+ * array don't exceed ALLOCSET_SEPARATE_THRESHOLD and overhead of allocation
+ * be possible less.  However, we don't cosider array sizes less than 1024
+ * 
+ */
+#define INITIAL_MEMTUPSIZE Max(1024, \
+	ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1)
+
 /* GUC variables */
 #ifdef TRACE_SORT
 bool		trace_sort = false;
@@ -243,6 +252,14 @@ struct Tuplesortstate
 	int64		allowedMem;		/* total memory allowed, in bytes */
 	int			maxTapes;		/* number of tapes (Knuth's T) */
 	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	int64		maxSpace;		/* maximum amount of space occupied among sort
+								   of groups, either in-memory or on-disk */
+	bool		maxSpaceOnDisk;	/* true when maxSpace is value for on-disk
+								   space, false when it's value for in-memory
+								   space */
+	TupSortStatus maxSpaceStatus; /* sort status when maxSpace was reached */
+	MemoryContext maincontext;	/* memory context for tuple sort metadata
+								   that persist across multiple batches */
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
@@ -647,6 +664,8 @@ static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
 static void leader_takeover_tapes(Tuplesortstate *state);
 static void free_sort_tuple(Tuplesortstate *state, SortTuple *stup);
+static void tuplesort_free(Tuplesortstate *state);
+static void tuplesort_updatemax(Tuplesortstate *state);
 
 /*
  * Special versions of qsort just for SortTuple objects.  qsort_tuple() sorts
@@ -682,6 +701,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 					   bool randomAccess)
 {
 	Tuplesortstate *state;
+	MemoryContext maincontext;
 	MemoryContext sortcontext;
 	MemoryContext tuplecontext;
 	MemoryContext oldcontext;
@@ -691,13 +711,21 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 		elog(ERROR, "random access disallowed under parallel sort");
 
 	/*
-	 * Create a working memory context for this sort operation. All data
-	 * needed by the sort will live inside this context.
+	 * Memory context surviving tuplesort_reset.  This memory context holds
+	 * data which is useful to keep while sorting multiple similar batches.
 	 */
-	sortcontext = AllocSetContextCreate(CurrentMemoryContext,
+	maincontext = AllocSetContextCreate(CurrentMemoryContext,
 										"TupleSort main",
 										ALLOCSET_DEFAULT_SIZES);
 
+	/*
+	 * Create a working memory context for one sort operation.  The content of
+	 * this context is deleted by tuplesort_reset.
+	 */
+	sortcontext = AllocSetContextCreate(maincontext,
+										"TupleSort sort",
+										ALLOCSET_DEFAULT_SIZES);
+
 	/*
 	 * Caller tuple (e.g. IndexTuple) memory context.
 	 *
@@ -715,7 +743,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	 * Make the Tuplesortstate within the per-sort context.  This way, we
 	 * don't need a separate pfree() operation for it at shutdown.
 	 */
-	oldcontext = MemoryContextSwitchTo(sortcontext);
+	oldcontext = MemoryContextSwitchTo(maincontext);
 
 	state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate));
 
@@ -740,6 +768,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	state->availMem = state->allowedMem;
 	state->sortcontext = sortcontext;
 	state->tuplecontext = tuplecontext;
+	state->maincontext = maincontext;
 	state->tapeset = NULL;
 
 	state->memtupcount = 0;
@@ -748,9 +777,7 @@ tuplesort_begin_common(int workMem, SortCoordinate coordinate,
 	 * Initial size of array must be more than ALLOCSET_SEPARATE_THRESHOLD;
 	 * see comments in grow_memtuples().
 	 */
-	state->memtupsize = Max(1024,
-							ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
-
+	state->memtupsize = INITIAL_MEMTUPSIZE;
 	state->growmemtuples = true;
 	state->slabAllocatorUsed = false;
 	state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
@@ -814,7 +841,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 	AssertArg(nkeys > 0);
 
@@ -890,7 +917,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
 	Assert(indexRel->rd_rel->relam == BTREE_AM_OID);
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -985,7 +1012,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	MemoryContext oldcontext;
 	int			i;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1064,7 +1091,7 @@ tuplesort_begin_index_hash(Relation heapRel,
 												   randomAccess);
 	MemoryContext oldcontext;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1107,7 +1134,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
 	int16		typlen;
 	bool		typbyval;
 
-	oldcontext = MemoryContextSwitchTo(state->sortcontext);
+	oldcontext = MemoryContextSwitchTo(state->maincontext);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
@@ -1224,16 +1251,12 @@ tuplesort_set_bound(Tuplesortstate *state, int64 bound)
 }
 
 /*
- * tuplesort_end
- *
- *	Release resources and clean up.
+ * tuplesort_free
  *
- * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
- * pointing to garbage.  Be careful not to attempt to use or free such
- * pointers afterwards!
+ *	Internal routine for freeing resources of tuplesort.
  */
-void
-tuplesort_end(Tuplesortstate *state)
+static void
+tuplesort_free(Tuplesortstate *state)
 {
 	/* context swap probably not needed, but let's be safe */
 	MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
@@ -1294,7 +1317,111 @@ tuplesort_end(Tuplesortstate *state)
 	 * Free the per-sort memory context, thereby releasing all working memory,
 	 * including the Tuplesortstate struct itself.
 	 */
-	MemoryContextDelete(state->sortcontext);
+	MemoryContextReset(state->sortcontext);
+}
+
+/*
+ * tuplesort_end
+ *
+ *	Release resources and clean up.
+ *
+ * NOTE: after calling this, any pointers returned by tuplesort_getXXX are
+ * pointing to garbage.  Be careful not to attempt to use or free such
+ * pointers afterwards!
+ */
+void
+tuplesort_end(Tuplesortstate *state)
+{
+	tuplesort_free(state);
+	MemoryContextDelete(state->maincontext);
+}
+
+/*
+ * tuplesort_updatemax
+ *
+ *	Update maximum resource usage statistics.
+ */
+static void
+tuplesort_updatemax(Tuplesortstate *state)
+{
+	int64	spaceUsed;
+	bool	spaceUsedOnDisk;
+
+	/*
+	 * Note: it might seem we should provide both memory and disk usage for a
+	 * disk-based sort.  However, the current code doesn't track memory space
+	 * accurately once we have begun to return tuples to the caller (since we
+	 * don't account for pfree's the caller is expected to do), so we cannot
+	 * rely on availMem in a disk sort.  This does not seem worth the overhead
+	 * to fix.  Is it worth creating an API for the memory context code to
+	 * tell us how much is actually used in sortcontext?
+	 */
+	if (state->tapeset)
+	{
+		spaceUsedOnDisk = true;
+		spaceUsed = LogicalTapeSetBlocks(state->tapeset) * BLCKSZ;
+	}
+	else
+	{
+		spaceUsedOnDisk = false;
+		spaceUsed = state->allowedMem - state->availMem;
+	}
+
+	/*
+	 * Sort evicts data to the disk when it didn't manage to fit those data
+	 * to the main memory.  This is why we assume space used on the disk to
+	 * be more important for tracking resource usage than space used in memory.
+	 * Note that amount of space occupied by some tuple set on the disk might
+	 * be less than amount of space occupied by the same tuple set in the
+	 * memory due to more compact representation.
+	 */
+	if ((spaceUsedOnDisk && !state->maxSpaceOnDisk) ||
+		(spaceUsedOnDisk == state->maxSpaceOnDisk && spaceUsed > state->maxSpace))
+	{
+		state->maxSpace = spaceUsed;
+		state->maxSpaceOnDisk = spaceUsedOnDisk;
+		state->maxSpaceStatus = state->status;
+	}
+}
+
+/*
+ * tuplesort_reset
+ *
+ *	Reset the tuplesort.  Reset all the data in the tuplesort, but leave the
+ *	meta-information in.  After tuplesort_reset, tuplesort is ready to start
+ *	a new sort.  It allows evade recreation of tuple sort (and save resources)
+ *	when sorting multiple small batches.
+ */
+void
+tuplesort_reset(Tuplesortstate *state)
+{
+	tuplesort_updatemax(state);
+	tuplesort_free(state);
+
+	state->status = TSS_INITIAL;
+	state->memtupcount = 0;
+	state->boundUsed = false;
+	state->tapeset = NULL;
+	state->currentRun = 0;
+	state->result_tape = -1;
+	state->bounded = false;
+	state->availMem = state->allowedMem;
+	state->lastReturnedTuple = NULL;
+	state->slabAllocatorUsed = false;
+	state->slabMemoryBegin = NULL;
+	state->slabMemoryEnd = NULL;
+	state->slabFreeHead = NULL;
+	state->growmemtuples = true;
+
+	if (state->memtupsize < INITIAL_MEMTUPSIZE)
+	{
+		if (state->memtuples)
+			pfree(state->memtuples);
+		state->memtuples = (SortTuple *) palloc(INITIAL_MEMTUPSIZE * sizeof(SortTuple));
+		state->memtupsize = INITIAL_MEMTUPSIZE;
+	}
+
+	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 }
 
 /*
@@ -2591,8 +2718,7 @@ mergeruns(Tuplesortstate *state)
 	 * Reset tuple memory.  We've freed all the tuples that we previously
 	 * allocated.  We will use the slab allocator from now on.
 	 */
-	MemoryContextDelete(state->tuplecontext);
-	state->tuplecontext = NULL;
+	MemoryContextResetOnly(state->tuplecontext);
 
 	/*
 	 * We no longer need a large memtuples array.  (We will allocate a smaller
@@ -2642,7 +2768,8 @@ mergeruns(Tuplesortstate *state)
 	 * from each input tape.
 	 */
 	state->memtupsize = numInputTapes;
-	state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple));
+	state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
+										numInputTapes * sizeof(SortTuple));
 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
 	/*
@@ -3139,18 +3266,15 @@ tuplesort_get_stats(Tuplesortstate *state,
 	 * to fix.  Is it worth creating an API for the memory context code to
 	 * tell us how much is actually used in sortcontext?
 	 */
-	if (state->tapeset)
-	{
+	tuplesort_updatemax(state);
+
+	if (state->maxSpaceOnDisk)
 		stats->spaceType = SORT_SPACE_TYPE_DISK;
-		stats->spaceUsed = LogicalTapeSetBlocks(state->tapeset) * (BLCKSZ / 1024);
-	}
 	else
-	{
 		stats->spaceType = SORT_SPACE_TYPE_MEMORY;
-		stats->spaceUsed = (state->allowedMem - state->availMem + 1023) / 1024;
-	}
+	stats->spaceUsed = (state->maxSpace + 1023) / 1024;
 
-	switch (state->status)
+	switch (state->maxSpaceStatus)
 	{
 		case TSS_SORTEDINMEM:
 			if (state->boundUsed)
diff --git a/src/include/executor/nodeIncrementalSort.h b/src/include/executor/nodeIncrementalSort.h
new file mode 100644
index 0000000000..90d7a81711
--- /dev/null
+++ b/src/include/executor/nodeIncrementalSort.h
@@ -0,0 +1,30 @@
+/*-------------------------------------------------------------------------
+ *
+ * nodeIncrementalSort.h
+ *
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/executor/nodeIncrementalSort.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef NODEINCREMENTALSORT_H
+#define NODEINCREMENTALSORT_H
+
+#include "access/parallel.h"
+#include "nodes/execnodes.h"
+
+extern IncrementalSortState *ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags);
+extern void ExecEndIncrementalSort(IncrementalSortState *node);
+extern void ExecReScanIncrementalSort(IncrementalSortState *node);
+
+/* parallel instrumentation support */
+extern void ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt);
+extern void ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pcxt);
+extern void ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node);
+
+#endif   /* NODEINCREMENTALSORT_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 538e679cdf..88f18e3701 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1876,6 +1876,20 @@ typedef struct MaterialState
 	Tuplestorestate *tuplestorestate;
 } MaterialState;
 
+
+/* ----------------
+ *	 When performing sorting by multiple keys input dataset could be already
+ *	 presorted by some prefix of these keys.  We call them "presorted keys".
+ *	 PresortedKeyData represents information about one such key.
+ * ----------------
+ */
+typedef struct PresortedKeyData
+{
+	FmgrInfo				flinfo;	/* comparison function info */
+	FunctionCallInfoData	fcinfo;	/* comparison function call info */
+	OffsetNumber			attno;	/* attribute number in tuple */
+} PresortedKeyData;
+
 /* ----------------
  *	 Shared memory container for per-worker sort information
  * ----------------
@@ -1904,6 +1918,46 @@ typedef struct SortState
 	SharedSortInfo *shared_info;	/* one entry per worker */
 } SortState;
 
+/* ----------------
+ *	 Shared memory container for per-worker incremental sort information
+ * ----------------
+ */
+typedef struct IncrementalSortInfo
+{
+	TuplesortInstrumentation	sinstrument;
+	int64						group_count;
+} IncrementalSortInfo;
+
+typedef struct SharedIncrementalSortInfo
+{
+	int							num_workers;
+	IncrementalSortInfo			sinfo[FLEXIBLE_ARRAY_MEMBER];
+} SharedIncrementalSortInfo;
+
+/* ----------------
+ *	 IncrementalSortState information
+ * ----------------
+ */
+typedef struct IncrementalSortState
+{
+	ScanState	ss;				/* its first field is NodeTag */
+	bool		bounded;		/* is the result set bounded? */
+	int64		bound;			/* if bounded, how many tuples are needed */
+	bool		sort_Done;		/* sort completed yet? */
+	bool		finished;		/* fetching tuples from outer node
+								   is finished ? */
+	bool		bounded_Done;	/* value of bounded we did the sort with */
+	int64		bound_Done;		/* value of bound we did the sort with */
+	void	   *tuplesortstate; /* private state of tuplesort.c */
+	/* the keys by which the input path is already sorted */
+	PresortedKeyData *presorted_keys;
+	int64		group_count;	/* number of groups with equal presorted keys */
+	/* slot for pivot tuple defining values of presorted keys within group */
+	TupleTableSlot *group_pivot;
+	bool		am_worker;		/* are we a worker? */
+	SharedIncrementalSortInfo *shared_info;	/* one entry per worker */
+} IncrementalSortState;
+
 /* ---------------------
  *	GroupState information
  * ---------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 4fc2de7184..cf9e2e64f9 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -74,6 +74,7 @@ typedef enum NodeTag
 	T_HashJoin,
 	T_Material,
 	T_Sort,
+	T_IncrementalSort,
 	T_Group,
 	T_Agg,
 	T_WindowAgg,
@@ -127,6 +128,7 @@ typedef enum NodeTag
 	T_HashJoinState,
 	T_MaterialState,
 	T_SortState,
+	T_IncrementalSortState,
 	T_GroupState,
 	T_AggState,
 	T_WindowAggState,
@@ -245,6 +247,7 @@ typedef enum NodeTag
 	T_ProjectionPath,
 	T_ProjectSetPath,
 	T_SortPath,
+	T_IncrementalSortPath,
 	T_GroupPath,
 	T_UpperUniquePath,
 	T_AggPath,
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 0a797f0a05..81f1844574 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -757,6 +757,17 @@ typedef struct Sort
 	bool	   *nullsFirst;		/* NULLS FIRST/LAST directions */
 } Sort;
 
+
+/* ----------------
+ *		incremental sort node
+ * ----------------
+ */
+typedef struct IncrementalSort
+{
+	Sort		sort;
+	int			presortedCols;	/* number of presorted columns */
+} IncrementalSort;
+
 /* ---------------
  *	 group node -
  *		Used for queries with GROUP BY (but no aggregates) specified.
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index acb8814924..75569203f3 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1534,6 +1534,15 @@ typedef struct SortPath
 	Path	   *subpath;		/* path representing input source */
 } SortPath;
 
+/*
+ * IncrementalSortPath
+ */
+typedef struct IncrementalSortPath
+{
+	SortPath	spath;
+	int			presortedCols;	/* number of presorted columns */
+} IncrementalSortPath;
+
 /*
  * GroupPath represents grouping (of presorted input)
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index d3269eae71..13b1c80632 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -61,6 +61,7 @@ extern PGDLLIMPORT bool enable_indexonlyscan;
 extern PGDLLIMPORT bool enable_bitmapscan;
 extern PGDLLIMPORT bool enable_tidscan;
 extern PGDLLIMPORT bool enable_sort;
+extern PGDLLIMPORT bool enable_incrementalsort;
 extern PGDLLIMPORT bool enable_hashagg;
 extern PGDLLIMPORT bool enable_nestloop;
 extern PGDLLIMPORT bool enable_material;
@@ -109,6 +110,15 @@ extern void cost_sort(Path *path, PlannerInfo *root,
 		  List *pathkeys, Cost input_cost, double tuples, int width,
 		  Cost comparison_cost, int sort_mem,
 		  double limit_tuples);
+extern void cost_full_sort(Cost *startup_cost, Cost *run_cost,
+			   Cost input_total_cost, double tuples, int width,
+			   Cost comparison_cost, int sort_mem,
+			   double limit_tuples);
+extern void cost_incremental_sort(Path *path,
+		  PlannerInfo *root, List *pathkeys, int presorted_keys,
+		  Cost input_startup_cost, Cost input_total_cost,
+		  double input_tuples, int width, Cost comparison_cost, int sort_mem,
+		  double limit_tuples);
 extern void cost_append(AppendPath *path);
 extern void cost_merge_append(Path *path, PlannerInfo *root,
 				  List *pathkeys, int n_streams,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 895bf6959d..72da4cec08 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -170,6 +170,12 @@ extern ProjectSetPath *create_set_projection_path(PlannerInfo *root,
 						   RelOptInfo *rel,
 						   Path *subpath,
 						   PathTarget *target);
+extern SortPath *create_incremental_sort_path(PlannerInfo *root,
+				 RelOptInfo *rel,
+				 Path *subpath,
+				 List *pathkeys,
+				 int presorted_keys,
+				 double limit_tuples);
 extern SortPath *create_sort_path(PlannerInfo *root,
 				 RelOptInfo *rel,
 				 Path *subpath,
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 50e180c554..3285a8055b 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -189,6 +189,8 @@ typedef enum
 
 extern PathKeysComparison compare_pathkeys(List *keys1, List *keys2);
 extern bool pathkeys_contained_in(List *keys1, List *keys2);
+extern bool pathkeys_common_contained_in(List *keys1, List *keys2, int *n_common);
+extern int pathkeys_common(List *keys1, List *keys2);
 extern Path *get_cheapest_path_for_pathkeys(List *paths, List *pathkeys,
 							   Relids required_outer,
 							   CostSelector cost_criterion,
diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h
index d2e6754f04..4cad0d4fc2 100644
--- a/src/include/utils/tuplesort.h
+++ b/src/include/utils/tuplesort.h
@@ -192,8 +192,7 @@ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
 					 int nkeys, AttrNumber *attNums,
 					 Oid *sortOperators, Oid *sortCollations,
 					 bool *nullsFirstFlags,
-					 int workMem, SortCoordinate coordinate,
-					 bool randomAccess);
+					 int workMem, SortCoordinate coordinate, bool randomAccess);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
 						Relation indexRel, int workMem,
 						SortCoordinate coordinate, bool randomAccess);
@@ -240,6 +239,8 @@ extern bool tuplesort_skiptuples(Tuplesortstate *state, int64 ntuples,
 
 extern void tuplesort_end(Tuplesortstate *state);
 
+extern void tuplesort_reset(Tuplesortstate *state);
+
 extern void tuplesort_get_stats(Tuplesortstate *state,
 					TuplesortInstrumentation *stats);
 extern const char *tuplesort_method_name(TuplesortMethod m);
diff --git a/src/test/isolation/expected/drop-index-concurrently-1.out b/src/test/isolation/expected/drop-index-concurrently-1.out
index 75dff56bc4..e11fb617b5 100644
--- a/src/test/isolation/expected/drop-index-concurrently-1.out
+++ b/src/test/isolation/expected/drop-index-concurrently-1.out
@@ -19,9 +19,10 @@ Sort
 step explains: EXPLAIN (COSTS OFF) EXECUTE getrow_seq;
 QUERY PLAN     
 
-Sort           
+Incremental Sort
   Sort Key: id, data
-  ->  Seq Scan on test_dc
+  Presorted Key: id
+  ->  Index Scan using test_dc_pkey on test_dc
         Filter: ((data)::text = '34'::text)
 step select2: SELECT * FROM test_dc WHERE data=34 ORDER BY id,data;
 id             data           
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
new file mode 100644
index 0000000000..fa7fb23319
--- /dev/null
+++ b/src/test/regress/expected/incremental_sort.out
@@ -0,0 +1,45 @@
+-- When we have to sort the entire table, incremental sort will
+-- be slower than plain sort, so it should not be used.
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten;
+            QUERY PLAN             
+-----------------------------------
+ Sort
+   Sort Key: tenk1.four, tenk1.ten
+   ->  Sort
+         Sort Key: tenk1.four
+         ->  Seq Scan on tenk1
+(5 rows)
+
+-- When there is a LIMIT clause, incremental sort is beneficial because
+-- it only has to sort some of the groups, and not the entire table.
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten
+limit 1;
+               QUERY PLAN                
+-----------------------------------------
+ Limit
+   ->  Incremental Sort
+         Sort Key: tenk1.four, tenk1.ten
+         Presorted Key: tenk1.four
+         ->  Sort
+               Sort Key: tenk1.four
+               ->  Seq Scan on tenk1
+(7 rows)
+
+-- When work_mem is not enough to sort the entire table, incremental sort
+-- may be faster if individual groups still fit into work_mem.
+set work_mem to '2MB';
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten;
+            QUERY PLAN             
+-----------------------------------
+ Incremental Sort
+   Sort Key: tenk1.four, tenk1.ten
+   Presorted Key: tenk1.four
+   ->  Sort
+         Sort Key: tenk1.four
+         ->  Seq Scan on tenk1
+(6 rows)
+
+reset work_mem;
diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out
index 76a8209ec2..b7b65fc62d 100644
--- a/src/test/regress/expected/partition_aggregate.out
+++ b/src/test/regress/expected/partition_aggregate.out
@@ -8,6 +8,8 @@ SET enable_partitionwise_aggregate TO true;
 SET enable_partitionwise_join TO true;
 -- Disable parallel plans.
 SET max_parallel_workers_per_gather TO 0;
+-- Disable incremental sort, which can influence selected plans due to fuzz factor.
+SET enable_incrementalsort TO off;
 --
 -- Tests for list partitioned tables.
 --
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index a19ee08749..9dec75060d 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -76,6 +76,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_gathermerge             | on
  enable_hashagg                 | on
  enable_hashjoin                | on
+ enable_incrementalsort         | on
  enable_indexonlyscan           | on
  enable_indexscan               | on
  enable_material                | on
@@ -88,7 +89,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(16 rows)
+(17 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 00c324dd44..1e0d6f4b62 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -84,7 +84,7 @@ test: select_into select_distinct select_distinct_on select_implicit select_havi
 # ----------
 # Another group of parallel tests
 # ----------
-test: brin gin gist spgist privileges init_privs security_label collate matview lock replica_identity rowsecurity object_address tablesample groupingsets drop_operator password func_index merge
+test: brin gin gist spgist privileges init_privs security_label collate matview lock replica_identity rowsecurity object_address tablesample groupingsets drop_operator password func_index merge incremental_sort
 
 # ----------
 # Another group of parallel tests
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 39c3fa9c85..c43abdf1fc 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -90,6 +90,7 @@ test: select_distinct_on
 test: select_implicit
 test: select_having
 test: subselect
+test: incremental_sort
 test: union
 test: case
 test: join
diff --git a/src/test/regress/sql/incremental_sort.sql b/src/test/regress/sql/incremental_sort.sql
new file mode 100644
index 0000000000..bd66228ada
--- /dev/null
+++ b/src/test/regress/sql/incremental_sort.sql
@@ -0,0 +1,18 @@
+-- When we have to sort the entire table, incremental sort will
+-- be slower than plain sort, so it should not be used.
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten;
+
+-- When there is a LIMIT clause, incremental sort is beneficial because
+-- it only has to sort some of the groups, and not the entire table.
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten
+limit 1;
+
+-- When work_mem is not enough to sort the entire table, incremental sort
+-- may be faster if individual groups still fit into work_mem.
+set work_mem to '2MB';
+explain (costs off)
+select * from (select * from tenk1 order by four) t order by four, ten;
+reset work_mem;
+
diff --git a/src/test/regress/sql/partition_aggregate.sql b/src/test/regress/sql/partition_aggregate.sql
index c60d7d2342..1b05456316 100644
--- a/src/test/regress/sql/partition_aggregate.sql
+++ b/src/test/regress/sql/partition_aggregate.sql
@@ -9,6 +9,8 @@ SET enable_partitionwise_aggregate TO true;
 SET enable_partitionwise_join TO true;
 -- Disable parallel plans.
 SET max_parallel_workers_per_gather TO 0;
+-- Disable incremental sort, which can influence selected plans due to fuzz factor.
+SET enable_incrementalsort TO off;
 
 --
 -- Tests for list partitioned tables.
