Attached 5th version of the patches, where minor refactoring of distance
handling was done (see below).

On 02.07.2018 19:06, Alexander Korotkov wrote:

Hi!

On Fri, Jun 29, 2018 at 5:37 PM Nikita Glukhov<n.glu...@postgrespro.ru>  wrote:
On 06.03.2018 17:30, David Steele wrote:

I agree with Andres.  Pushing this patch to the next CF.
Attached 4th version of the patches rebased onto the current master.
Nothing interesting has changed from the previous version.
I took a look to this patchset.  In general, it looks good for me, but
I've following notes about it.

* We're going to replace scan stack with pairing heap not only for
KNN-search, but for regular search as well.  Did you verify that it
doesn't cause regression for regular search case, because insertion
into pairing heap might be slower than insertion into stack?  One may
say that it didn't caused regression in GiST, and that's why it
shouldn't cause regression in SP-GiST.  However, SP-GiST trees might
be much higher and these performance aspects might be different.

I decided to bring back the List-based scan stack in the previous version
of the patches, and here is how spgAddSearchItemToQueue() looks like now:

static void
spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
{
    if (so->queue)
        pairingheap_add(so->queue, &item->phNode);
    else
        so->scanStack = lcons(item, so->scanStack);
}

so->queue is initialized in spgrescan() only for ordered searches.

* I think null handling requires better explanation. Nulls are
specially handled in pairingheap_SpGistSearchItem_cmp().  In the same
time you explicitly set infinity distances for leaf nulls.  You
probably have reasons to implement it this way, but I think this
should be explained.  Also isnull property of SpGistSearchItem doesn't
have comment.

Distances for NULL items are expected to be NULL (it would not be true for
non-strict the distance operators, so we do not seem to support them here),
and so NULL items are considered to be greater than any non-NULL items in
pairingheap_SpGistSearchItem_cmp().  Distances are copied into SpGistSearchItem
only if it is non-NULL item.  Also in the last version of the patch I have
introduced spgAllocSearchItem() which conditionally allocates distance-array in
SpGistSearchItem.  Now inifinity distances are used only in one place, but
if we require that innerConsistent() should always return distances, then
we can completely get rid of so->infDistances field.
* I think KNN support should be briefly described in
src/backed/access/spgist/README.

A minimal description written by the original author Vlad Sterzhanov
already present in README.

--
Nikita Glukhov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/src/backend/utils/adt/geo_ops.c b/src/backend/utils/adt/geo_ops.c
index f57380a..a4345ef 100644
--- a/src/backend/utils/adt/geo_ops.c
+++ b/src/backend/utils/adt/geo_ops.c
@@ -41,7 +41,6 @@ enum path_delim
 static int	point_inside(Point *p, int npts, Point *plist);
 static int	lseg_crossing(double x, double y, double px, double py);
 static BOX *box_construct(double x1, double x2, double y1, double y2);
-static BOX *box_fill(BOX *result, double x1, double x2, double y1, double y2);
 static bool box_ov(BOX *box1, BOX *box2);
 static double box_ht(BOX *box);
 static double box_wd(BOX *box);
@@ -451,7 +450,7 @@ box_construct(double x1, double x2, double y1, double y2)
 
 /*		box_fill		-		fill in a given box struct
  */
-static BOX *
+BOX *
 box_fill(BOX *result, double x1, double x2, double y1, double y2)
 {
 	if (x1 > x2)
@@ -482,7 +481,7 @@ box_fill(BOX *result, double x1, double x2, double y1, double y2)
 /*		box_copy		-		copy a box
  */
 BOX *
-box_copy(BOX *box)
+box_copy(const BOX *box)
 {
 	BOX		   *result = (BOX *) palloc(sizeof(BOX));
 
diff --git a/src/include/utils/geo_decls.h b/src/include/utils/geo_decls.h
index a589e42..94806c2 100644
--- a/src/include/utils/geo_decls.h
+++ b/src/include/utils/geo_decls.h
@@ -182,6 +182,7 @@ typedef struct
 extern double point_dt(Point *pt1, Point *pt2);
 extern double point_sl(Point *pt1, Point *pt2);
 extern double pg_hypot(double x, double y);
-extern BOX *box_copy(BOX *box);
+extern BOX *box_copy(const BOX *box);
+extern BOX *box_fill(BOX *box, double xlo, double xhi, double ylo, double yhi);
 
 #endif							/* GEO_DECLS_H */
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index c4e8a3b..9279756 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -14,9 +14,9 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
-#include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -543,7 +543,6 @@ getNextNearest(IndexScanDesc scan)
 {
 	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
 	bool		res = false;
-	int			i;
 
 	if (scan->xs_hitup)
 	{
@@ -564,45 +563,10 @@ getNextNearest(IndexScanDesc scan)
 			/* found a heap item at currently minimal distance */
 			scan->xs_ctup.t_self = item->data.heap.heapPtr;
 			scan->xs_recheck = item->data.heap.recheck;
-			scan->xs_recheckorderby = item->data.heap.recheckDistances;
-			for (i = 0; i < scan->numberOfOrderBys; i++)
-			{
-				if (so->orderByTypes[i] == FLOAT8OID)
-				{
-#ifndef USE_FLOAT8_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float8GetDatum(item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else if (so->orderByTypes[i] == FLOAT4OID)
-				{
-					/* convert distance function's result to ORDER BY type */
-#ifndef USE_FLOAT4_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float4GetDatum((float4) item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else
-				{
-					/*
-					 * If the ordering operator's return value is anything
-					 * else, we don't know how to convert the float8 bound
-					 * calculated by the distance function to that.  The
-					 * executor won't actually need the order by values we
-					 * return here, if there are no lossy results, so only
-					 * insist on converting if the *recheck flag is set.
-					 */
-					if (scan->xs_recheckorderby)
-						elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
-					scan->xs_orderbynulls[i] = true;
-				}
-			}
+
+			index_store_float8_orderby_distances(scan, so->orderByTypes,
+												 item->distances,
+												 item->data.heap.recheckDistances);
 
 			/* in an index-only scan, also return the reconstructed tuple. */
 			if (scan->xs_want_itup)
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 22b5cc9..6dac90c 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -74,6 +74,7 @@
 #include "access/transam.h"
 #include "access/xlog.h"
 #include "catalog/index.h"
+#include "catalog/pg_type.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -897,3 +898,72 @@ index_getprocinfo(Relation irel,
 
 	return locinfo;
 }
+
+/* ----------------
+ *		index_store_float8_orderby_distances
+ *
+ *		Convert AM distance function's results (that can be inexact)
+ *		to ORDER BY types and save them into xs_orderbyvals/xs_orderbynulls
+ *		for a possible recheck.
+ * ----------------
+ */
+void
+index_store_float8_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+									 double *distances, bool recheckOrderBy)
+{
+	int			i;
+
+	scan->xs_recheckorderby = recheckOrderBy;
+
+	if (!distances)
+	{
+		Assert(!scan->xs_recheckorderby);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			scan->xs_orderbyvals[i] = (Datum) 0;
+			scan->xs_orderbynulls[i] = true;
+		}
+
+		return;
+	}
+
+	for (i = 0; i < scan->numberOfOrderBys; i++)
+	{
+		if (orderByTypes[i] == FLOAT8OID)
+		{
+#ifndef USE_FLOAT8_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float8GetDatum(distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else if (orderByTypes[i] == FLOAT4OID)
+		{
+			/* convert distance function's result to ORDER BY type */
+#ifndef USE_FLOAT4_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float4GetDatum((float4) distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else
+		{
+			/*
+			 * If the ordering operator's return value is anything
+			 * else, we don't know how to convert the float8 bound
+			 * calculated by the distance function to that.  The
+			 * executor won't actually need the order by values we
+			 * return here, if there are no lossy results, so only
+			 * insist on converting if the *recheck flag is set.
+			 */
+			if (scan->xs_recheckorderby)
+				elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
+			scan->xs_orderbynulls[i] = true;
+		}
+	}
+}
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 24c720b..534fac7 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -174,6 +174,9 @@ extern RegProcedure index_getprocid(Relation irel, AttrNumber attnum,
 				uint16 procnum);
 extern FmgrInfo *index_getprocinfo(Relation irel, AttrNumber attnum,
 				  uint16 procnum);
+extern void index_store_float8_orderby_distances(IndexScanDesc scan,
+									 Oid *orderByTypes, double *distances,
+									 bool recheckOrderBy);
 
 /*
  * index access method support routines (in genam.c)
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index 55cccd2..8a8333b 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -24,6 +24,7 @@
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/syscache.h"
+#include "utils/lsyscache.h"
 
 
 /*
@@ -872,12 +873,6 @@ gistproperty(Oid index_oid, int attno,
 			 IndexAMProperty prop, const char *propname,
 			 bool *res, bool *isnull)
 {
-	HeapTuple	tuple;
-	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
-	Form_pg_opclass rd_opclass;
-	Datum		datum;
-	bool		disnull;
-	oidvector  *indclass;
 	Oid			opclass,
 				opfamily,
 				opcintype;
@@ -911,44 +906,21 @@ gistproperty(Oid index_oid, int attno,
 	}
 
 	/* First we need to know the column's opclass. */
-
-	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
-	if (!HeapTupleIsValid(tuple))
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_index = (Form_pg_index) GETSTRUCT(tuple);
-
-	/* caller is supposed to guarantee this */
-	Assert(attno > 0 && attno <= rd_index->indnatts);
-
-	datum = SysCacheGetAttr(INDEXRELID, tuple,
-							Anum_pg_index_indclass, &disnull);
-	Assert(!disnull);
-
-	indclass = ((oidvector *) DatumGetPointer(datum));
-	opclass = indclass->values[attno - 1];
-
-	ReleaseSysCache(tuple);
 
 	/* Now look up the opclass family and input datatype. */
-
-	tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
-	if (!HeapTupleIsValid(tuple))
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
-
-	opfamily = rd_opclass->opcfamily;
-	opcintype = rd_opclass->opcintype;
-
-	ReleaseSysCache(tuple);
 
 	/* And now we can check whether the function is provided. */
-
 	*res = SearchSysCacheExists4(AMPROCNUM,
 								 ObjectIdGetDatum(opfamily),
 								 ObjectIdGetDatum(opcintype),
@@ -968,6 +940,8 @@ gistproperty(Oid index_oid, int attno,
 									  Int16GetDatum(GIST_COMPRESS_PROC));
 	}
 
+	*isnull = false;
+
 	return true;
 }
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index bba595a..0c116b3 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1067,6 +1067,32 @@ get_opclass_input_type(Oid opclass)
 	return result;
 }
 
+/*
+ * get_opclass_family_and_input_type
+ *
+ *		Returns the OID of the operator family the opclass belongs to,
+ *				the OID of the datatype the opclass indexes
+ */
+bool
+get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
+{
+	HeapTuple	tp;
+	Form_pg_opclass cla_tup;
+
+	tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+	if (!HeapTupleIsValid(tp))
+		return false;
+
+	cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+	*opfamily = cla_tup->opcfamily;
+	*opcintype = cla_tup->opcintype;
+
+	ReleaseSysCache(tp);
+
+	return true;
+}
+
 /*				---------- OPERATOR CACHE ----------					 */
 
 /*
@@ -3106,3 +3132,45 @@ get_range_subtype(Oid rangeOid)
 	else
 		return InvalidOid;
 }
+
+/*				---------- PG_INDEX CACHE ----------				 */
+
+/*
+ * get_index_column_opclass
+ *
+ *		Given the index OID and column number,
+ *		return opclass of the index column
+ *			or InvalidOid if the index was not found.
+ */
+Oid
+get_index_column_opclass(Oid index_oid, int attno)
+{
+	HeapTuple	tuple;
+	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
+	Datum		datum;
+	bool		isnull;
+	oidvector  *indclass;
+	Oid			opclass;
+
+	/* First we need to know the column's opclass. */
+
+	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
+	if (!HeapTupleIsValid(tuple))
+		return InvalidOid;
+
+	rd_index = (Form_pg_index) GETSTRUCT(tuple);
+
+	/* caller is supposed to guarantee this */
+	Assert(attno > 0 && attno <= rd_index->indnatts);
+
+	datum = SysCacheGetAttr(INDEXRELID, tuple,
+							Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+
+	indclass = ((oidvector *) DatumGetPointer(datum));
+	opclass = indclass->values[attno - 1];
+
+	ReleaseSysCache(tuple);
+
+	return opclass;
+}
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index e55ea40..e0eea2a 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -95,6 +95,8 @@ extern char *get_constraint_name(Oid conoid);
 extern char *get_language_name(Oid langoid, bool missing_ok);
 extern Oid	get_opclass_family(Oid opclass);
 extern Oid	get_opclass_input_type(Oid opclass);
+extern bool get_opclass_opfamily_and_input_type(Oid opclass,
+									Oid *opfamily, Oid *opcintype);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid	get_op_rettype(Oid opno);
@@ -176,6 +178,7 @@ extern void free_attstatsslot(AttStatsSlot *sslot);
 extern char *get_namespace_name(Oid nspid);
 extern char *get_namespace_name_or_temp(Oid nspid);
 extern Oid	get_range_subtype(Oid rangeOid);
+extern Oid	get_index_column_opclass(Oid index_oid, int attno);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 14a1aa5..0ad4f73 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -282,6 +282,14 @@ SELECT * FROM places ORDER BY location <-> point '(101,456)' LIMIT 10;
   </para>
 
   <para>
+   If used with an <link linkend="xindex-ordering-ops">ordering operator</link>, SP-GiST indexes 
+   can optimize <quote>nearest-neighbor</quote> search.
+   For SP-GiST operator classes that support distance ordering, the 
+   corresponding operator is specified in the <quote>Ordering Operators</quote>
+   column in <xref linkend="spgist-builtin-opclasses-table"/>.
+  </para>
+
+  <para>
    <indexterm>
     <primary>index</primary>
     <secondary>GIN</secondary>
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 1816fdf..5358bbb 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -630,7 +630,9 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ...
 typedef struct spgInnerConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
+    ScanKey     orderbyKeys;    /* array of ordering operators and comparison values */
     int         nkeys;          /* length of array */
+    int         norderbys;      /* length of array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -653,6 +655,7 @@ typedef struct spgInnerConsistentOut
     int        *levelAdds;      /* increment level by this much for each */
     Datum      *reconstructedValues;    /* associated reconstructed values */
     void      **traversalValues;        /* opclass-specific traverse values */
+    double    **distances;              /* associated distances */
 } spgInnerConsistentOut;
 </programlisting>
 
@@ -667,6 +670,8 @@ typedef struct spgInnerConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</structfield> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       <structfield>orderbyKeys</structfield>, of length <structfield>norderbys</structfield>,
+       describes ordering operators (if any) in the same fashion.
        <structfield>reconstructedValue</structfield> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</literal> at the root level or if the
        <function>inner_consistent</function> function did not provide a value at the
@@ -709,6 +714,11 @@ typedef struct spgInnerConsistentOut
        of <structname>spgConfigOut</structname>.<structfield>leafType</structfield> type
        reconstructed for each child node to be visited; otherwise, leave
        <structfield>reconstructedValues</structfield> as NULL.
+       <structfield>distances</structfield>> if the ordered search is carried out,
+       the implementation is supposed to fill them in accordance to the
+       ordering operators provided in <structfield>orderbyKeys</structfield>>
+       (nodes with lowest distances will be processed first). Leave it
+       NULL otherwise.
        If it is desired to pass down additional out-of-band information
        (<quote>traverse values</quote>) to lower levels of the tree search,
        set <structfield>traversalValues</structfield> to an array of the appropriate
@@ -717,6 +727,7 @@ typedef struct spgInnerConsistentOut
        Note that the <function>inner_consistent</function> function is
        responsible for palloc'ing the
        <structfield>nodeNumbers</structfield>, <structfield>levelAdds</structfield>,
+       <structfield>distances</structfield>,
        <structfield>reconstructedValues</structfield>, and
        <structfield>traversalValues</structfield> arrays in the current memory context.
        However, any output traverse values pointed to by
@@ -747,7 +758,9 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ...
 typedef struct spgLeafConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
-    int         nkeys;          /* length of array */
+    ScanKey     orderbykeys;    /* array of ordering operators and comparison values */
+    int         nkeys;          /* length of scankeys array */
+    int         norderbys;      /* length of orderbykeys array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -759,8 +772,10 @@ typedef struct spgLeafConsistentIn
 
 typedef struct spgLeafConsistentOut
 {
-    Datum       leafValue;      /* reconstructed original data, if any */
-    bool        recheck;        /* set true if operator must be rechecked */
+    Datum       leafValue;        /* reconstructed original data, if any */
+    bool        recheck;          /* set true if operator must be rechecked */
+    bool        recheckDistances; /* set true if distances must be rechecked */
+    double     *distances;        /* associated distances */
 } spgLeafConsistentOut;
 </programlisting>
 
@@ -775,6 +790,8 @@ typedef struct spgLeafConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</structfield> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       The array <structfield>orderbykeys</structfield>, of length <structfield>norderbys</structfield>,
+       describes the ordering operators in the same fashion.
        <structfield>reconstructedValue</structfield> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</literal> at the root level or if the
        <function>inner_consistent</function> function did not provide a value at the
@@ -803,6 +820,13 @@ typedef struct spgLeafConsistentOut
        <structfield>recheck</structfield> may be set to <literal>true</literal> if the match
        is uncertain and so the operator(s) must be re-applied to the actual
        heap tuple to verify the match.
+       If the ordered search is performed, <structfield>distances</structfield>
+       should be set in accordance with the ordering operator provided, otherwise
+       implementation is supposed to set it to NULL.
+       If at least one of returned distances is not exact, the function must set
+       <structfield>recheckDistances</structfield> to true.  In this case, the executor
+       calculates the exact distances after fetching the tuple from the heap,
+       and reorders the tuples if necessary.
       </para>
      </listitem>
     </varlistentry>
diff --git a/doc/src/sgml/xindex.sgml b/doc/src/sgml/xindex.sgml
index 9f5c0c3..1c459c4 100644
--- a/doc/src/sgml/xindex.sgml
+++ b/doc/src/sgml/xindex.sgml
@@ -1242,7 +1242,7 @@ SELECT sum(x) OVER (ORDER BY x RANGE BETWEEN 5 PRECEDING AND 10 FOLLOWING)
   <title>Ordering Operators</title>
 
   <para>
-   Some index access methods (currently, only GiST) support the concept of
+   Some index access methods (currently, only GiST and SP-GiST) support the concept of
    <firstterm>ordering operators</firstterm>.  What we have been discussing so far
    are <firstterm>search operators</firstterm>.  A search operator is one for which
    the index can be searched to find all rows satisfying
diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README
index 09ab21a..cb38650 100644
--- a/src/backend/access/spgist/README
+++ b/src/backend/access/spgist/README
@@ -41,7 +41,12 @@ contain exactly one inner tuple.
 
 When the search traversal algorithm reaches an inner tuple, it chooses a set
 of nodes to continue tree traverse in depth.  If it reaches a leaf page it
-scans a list of leaf tuples to find the ones that match the query.
+scans a list of leaf tuples to find the ones that match the query. SP-GiSTs
+also support ordered searches - that is during the scan is performed,
+implementation can choose to map floating-point distances to inner and leaf
+tuples and the traversal will be performed by the closest-first model. It
+can be usefull e.g. for performing nearest-neighbour searches on the dataset.
+
 
 The insertion algorithm descends the tree similarly, except it must choose
 just one node to descend to from each inner tuple.  Insertion might also have
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index c728080..880057e 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -15,79 +15,158 @@
 
 #include "postgres.h"
 
+#include <math.h>
+
+#include "access/genam.h"
 #include "access/relscan.h"
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "utils/builtins.h"
 #include "utils/datum.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
-
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
-							   Datum leafValue, bool isnull, bool recheck);
+							   Datum leafValue, bool isNull, bool recheck,
+							   bool recheckDist, double *distances);
 
-typedef struct ScanStackEntry
+/*
+ * Pairing heap comparison function for the SpGistSearchItem queue
+ */
+static int
+pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
+								 const pairingheap_node *b, void *arg)
 {
-	Datum		reconstructedValue; /* value reconstructed from parent */
-	void	   *traversalValue; /* opclass-specific traverse value */
-	int			level;			/* level of items on this page */
-	ItemPointerData ptr;		/* block and offset to scan from */
-} ScanStackEntry;
+	const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
+	const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
+	IndexScanDesc scan = (IndexScanDesc) arg;
+	int			i;
+
+	if (sa->isNull)
+	{
+		if (!sb->isNull)
+			return -1;
+	}
+	else if (sb->isNull)
+	{
+		return 1;
+	}
+	else
+	{
+		/* Order according to distance comparison */
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
+				continue;	/* NaN == NaN */
+			if (isnan(sa->distances[i]))
+				return -1;	/* NaN > number */
+			if (isnan(sb->distances[i]))
+				return 1;	/* number < NaN */
+			if (sa->distances[i] != sb->distances[i])
+				return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+		}
+	}
+
+	/* Leaf items go before inner pages, to ensure a depth-first search */
+	if (sa->isLeaf && !sb->isLeaf)
+		return 1;
+	if (!sa->isLeaf && sb->isLeaf)
+		return -1;
 
+	return 0;
+}
 
-/* Free a ScanStackEntry */
 static void
-freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
+spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
 {
 	if (!so->state.attLeafType.attbyval &&
-		DatumGetPointer(stackEntry->reconstructedValue) != NULL)
-		pfree(DatumGetPointer(stackEntry->reconstructedValue));
-	if (stackEntry->traversalValue)
-		pfree(stackEntry->traversalValue);
+		DatumGetPointer(item->value) != NULL)
+		pfree(DatumGetPointer(item->value));
+
+	if (item->traversalValue)
+		pfree(item->traversalValue);
 
-	pfree(stackEntry);
+	pfree(item);
 }
 
-/* Free the entire stack */
+/*
+ * Add SpGistSearchItem to queue
+ *
+ * Called in queue context
+ */
 static void
-freeScanStack(SpGistScanOpaque so)
+spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item)
 {
-	ListCell   *lc;
+	if (so->queue)
+		pairingheap_add(so->queue, &item->phNode);
+	else
+		so->scanStack = lcons(item, so->scanStack);
+}
 
-	foreach(lc, so->scanStack)
-	{
-		freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
-	}
-	list_free(so->scanStack);
-	so->scanStack = NIL;
+static SpGistSearchItem *
+spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
+{
+	/* allocate distance array only for non-NULL items */
+	SpGistSearchItem *item =
+		palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfOrderBys));
+
+	item->isNull = isnull;
+
+	if (!isnull && so->numberOfOrderBys > 0)
+		memcpy(item->distances, distances,
+			   so->numberOfOrderBys * sizeof(double));
+
+	return item;
+}
+
+static void
+spgAddStartItem(SpGistScanOpaque so, bool isnull)
+{
+	SpGistSearchItem *startEntry =
+		spgAllocSearchItem(so, isnull, so->zeroDistances);
+
+	ItemPointerSet(&startEntry->heapPtr,
+				   isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
+				   FirstOffsetNumber);
+	startEntry->isLeaf = false;
+	startEntry->level = 0;
+	startEntry->value = (Datum) 0;
+	startEntry->traversalValue = NULL;
+	startEntry->recheckDist = false;
+	startEntry->recheckQual = false;
+
+	spgAddSearchItemToQueue(so, startEntry);
 }
 
 /*
- * Initialize scanStack to search the root page, resetting
+ * Initialize queue to search the root page, resetting
  * any previously active scan
  */
 static void
 resetSpGistScanOpaque(SpGistScanOpaque so)
 {
-	ScanStackEntry *startEntry;
-
-	freeScanStack(so);
+	MemoryContext oldCtx = MemoryContextSwitchTo(so->queueCxt);
 
 	if (so->searchNulls)
-	{
-		/* Stack a work item to scan the null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
-	}
+		/* Add a work item to scan the null index entries */
+		spgAddStartItem(so, true);
 
 	if (so->searchNonNulls)
+		/* Add a work item to scan the non-null index entries */
+		spgAddStartItem(so, false);
+
+	MemoryContextSwitchTo(oldCtx);
+
+	if (so->numberOfOrderBys > 0)
 	{
-		/* Stack a work item to scan the non-null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
+		/* Must pfree distances to avoid memory leak */
+		int			i;
+
+		for (i = 0; i < so->nPtrs; i++)
+			if (so->distances[i])
+				pfree(so->distances[i]);
 	}
 
 	if (so->want_itup)
@@ -122,6 +201,9 @@ spgPrepareScanKeys(IndexScanDesc scan)
 	int			nkeys;
 	int			i;
 
+	so->numberOfOrderBys = scan->numberOfOrderBys;
+	so->orderByData = scan->orderByData;
+
 	if (scan->numberOfKeys <= 0)
 	{
 		/* If no quals, whole-index scan is required */
@@ -182,8 +264,9 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
 	IndexScanDesc scan;
 	SpGistScanOpaque so;
+	int			i;
 
-	scan = RelationGetIndexScan(rel, keysz, 0);
+	scan = RelationGetIndexScan(rel, keysz, orderbysz);
 
 	so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
 	if (keysz > 0)
@@ -191,6 +274,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	else
 		so->keyData = NULL;
 	initSpGistState(&so->state, scan->indexRelation);
+
 	so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
 										"SP-GiST search temporary context",
 										ALLOCSET_DEFAULT_SIZES);
@@ -201,6 +285,36 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	/* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
 	so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
 
+	if (scan->numberOfOrderBys > 0)
+	{
+		so->zeroDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+		so->infDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			so->zeroDistances[i] = 0.0;
+			so->infDistances[i] = get_float8_infinity();
+		}
+
+		scan->xs_orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys);
+		scan->xs_orderbynulls = palloc(sizeof(bool) * scan->numberOfOrderBys);
+		memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys);
+	}
+
+	so->queueCxt = AllocSetContextCreate(CurrentMemoryContext,
+										 "SP-GiST queue context",
+										 ALLOCSET_DEFAULT_SIZES);
+
+	fmgr_info_copy(&so->innerConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	fmgr_info_copy(&so->leafConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	so->indexCollation = rel->rd_indcollation[0];
+
 	scan->opaque = so;
 
 	return scan;
@@ -211,21 +325,58 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		  ScanKey orderbys, int norderbys)
 {
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
+	MemoryContext oldCxt;
 
 	/* clear traversal context before proceeding to the next scan */
 	MemoryContextReset(so->traversalCxt);
 
 	/* copy scankeys into local storage */
 	if (scankey && scan->numberOfKeys > 0)
-	{
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
+
+	if (orderbys && scan->numberOfOrderBys > 0)
+	{
+		int			i;
+
+		memmove(scan->orderByData, orderbys,
+				scan->numberOfOrderBys * sizeof(ScanKeyData));
+
+		so->orderByTypes = (Oid *) palloc(sizeof(Oid) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			ScanKey		skey = &scan->orderByData[i];
+
+			/*
+			 * Look up the datatype returned by the original ordering
+			 * operator. SP-GiST always uses a float8 for the distance function,
+			 * but the ordering operator could be anything else.
+			 *
+			 * XXX: The distance function is only allowed to be lossy if the
+			 * ordering operator's result type is float4 or float8.  Otherwise
+			 * we don't know how to return the distance to the executor.  But
+			 * we cannot check that here, as we won't know if the distance
+			 * function is lossy until it returns *recheck = true for the
+			 * first time.
+			 */
+			so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
+		}
 	}
 
 	/* preprocess scankeys, set up the representation in *so */
 	spgPrepareScanKeys(scan);
 
-	/* set up starting stack entries */
+	so->scanStack = NIL;
+
+	MemoryContextReset(so->queueCxt);
+	oldCxt = MemoryContextSwitchTo(so->queueCxt);
+	/* initialize queue only for distance-ordered scans */
+	so->queue = scan->numberOfOrderBys <= 0 ? NULL :
+		pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, scan);
+	MemoryContextSwitchTo(oldCxt);
+
+	/* set up starting queue entries */
 	resetSpGistScanOpaque(so);
 }
 
@@ -236,65 +387,349 @@ spgendscan(IndexScanDesc scan)
 
 	MemoryContextDelete(so->tempCxt);
 	MemoryContextDelete(so->traversalCxt);
+	MemoryContextDelete(so->queueCxt);
+
+	if (scan->numberOfOrderBys > 0)
+	{
+		pfree(so->zeroDistances);
+		pfree(so->infDistances);
+	}
+}
+
+/*
+ * Leaf SpGistSearchItem constructor, called in queue context
+ */
+static SpGistSearchItem *
+spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
+			   Datum leafValue, bool recheckQual, bool recheckDist, bool isnull,
+			   double *distances)
+{
+	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
+
+	item->level = level;
+	item->heapPtr = *heapPtr;
+	/* copy value to queue cxt out of tmp cxt */
+	item->value = isnull ? (Datum) 0 :
+		datumCopy(leafValue, so->state.attLeafType.attbyval,
+				  so->state.attLeafType.attlen);
+	item->traversalValue = NULL;
+	item->isLeaf = true;
+	item->recheckQual = recheckQual;
+	item->recheckDist = recheckDist;
+
+	return item;
 }
 
 /*
  * Test whether a leaf tuple satisfies all the scan keys
  *
- * *leafValue is set to the reconstructed datum, if provided
- * *recheck is set true if any of the operators are lossy
+ * *reportedSome is set to true if:
+ *		the scan is not ordered AND the item satisfies the scankeys
  */
 static bool
-spgLeafTest(Relation index, SpGistScanOpaque so,
+spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			SpGistLeafTuple leafTuple, bool isnull,
-			int level, Datum reconstructedValue,
-			void *traversalValue,
-			Datum *leafValue, bool *recheck)
+			bool *reportedSome, storeRes_func storeRes)
 {
+	Datum		leafValue;
+	double	   *distances;
 	bool		result;
-	Datum		leafDatum;
-	spgLeafConsistentIn in;
-	spgLeafConsistentOut out;
-	FmgrInfo   *procinfo;
-	MemoryContext oldCtx;
+	bool		recheckQual;
+	bool		recheckDist;
 
 	if (isnull)
 	{
 		/* Should not have arrived on a nulls page unless nulls are wanted */
 		Assert(so->searchNulls);
-		*leafValue = (Datum) 0;
-		*recheck = false;
-		return true;
+		leafValue = (Datum) 0;
+		distances = NULL;
+		recheckQual = false;
+		recheckDist = false;
+		result = true;
+	}
+	else
+	{
+		spgLeafConsistentIn in;
+		spgLeafConsistentOut out;
+
+		/* use temp context for calling leaf_consistent */
+		MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
+
+		in.scankeys = so->keyData;
+		in.nkeys = so->numberOfKeys;
+		in.orderbykeys = so->orderByData;
+		in.norderbys = so->numberOfOrderBys;
+		in.reconstructedValue = item->value;
+		in.traversalValue = item->traversalValue;
+		in.level = item->level;
+		in.returnData = so->want_itup;
+		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
+
+		out.leafValue = (Datum) 0;
+		out.recheck = false;
+		out.distances = NULL;
+		out.recheckDistances = false;
+
+		result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
+												so->indexCollation,
+												PointerGetDatum(&in),
+												PointerGetDatum(&out)));
+		recheckQual = out.recheck;
+		recheckDist = out.recheckDistances;
+		leafValue = out.leafValue;
+		distances = out.distances;
+
+		MemoryContextSwitchTo(oldCxt);
 	}
 
-	leafDatum = SGLTDATUM(leafTuple, &so->state);
+	if (result)
+	{
+		/* item passes the scankeys */
+		if (so->numberOfOrderBys > 0)
+		{
+			/* the scan is ordered -> add the item to the queue */
+			MemoryContext oldCxt = MemoryContextSwitchTo(so->queueCxt);
+			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
+														&leafTuple->heapPtr,
+														leafValue,
+														recheckQual,
+														recheckDist,
+														isnull,
+														distances);
+
+			spgAddSearchItemToQueue(so, heapItem);
+
+			MemoryContextSwitchTo(oldCxt);
+		}
+		else
+		{
+			/* non-ordered scan, so report the item right away */
+			Assert(!recheckDist);
+			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
+					 recheckQual, false, NULL);
+			*reportedSome = true;
+		}
+	}
 
-	/* use temp context for calling leaf_consistent */
-	oldCtx = MemoryContextSwitchTo(so->tempCxt);
+	return result;
+}
 
-	in.scankeys = so->keyData;
-	in.nkeys = so->numberOfKeys;
-	in.reconstructedValue = reconstructedValue;
-	in.traversalValue = traversalValue;
-	in.level = level;
-	in.returnData = so->want_itup;
-	in.leafDatum = leafDatum;
+/* A bundle initializer for inner_consistent methods */
+static void
+spgInitInnerConsistentIn(spgInnerConsistentIn *in,
+						 SpGistScanOpaque so,
+						 SpGistSearchItem *item,
+						 SpGistInnerTuple innerTuple)
+{
+	in->scankeys = so->keyData;
+	in->nkeys = so->numberOfKeys;
+	in->norderbys = so->numberOfOrderBys;
+	in->orderbyKeys = so->orderByData;
+	in->reconstructedValue = item->value;
+	in->traversalMemoryContext = so->traversalCxt;
+	in->traversalValue = item->traversalValue;
+	in->level = item->level;
+	in->returnData = so->want_itup;
+	in->allTheSame = innerTuple->allTheSame;
+	in->hasPrefix = (innerTuple->prefixSize > 0);
+	in->prefixDatum = SGITDATUM(innerTuple, &so->state);
+	in->nNodes = innerTuple->nNodes;
+	in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
+}
 
-	out.leafValue = (Datum) 0;
-	out.recheck = false;
+static SpGistSearchItem *
+spgMakeInnerItem(SpGistScanOpaque so,
+				 SpGistSearchItem *parentItem,
+				 SpGistNodeTuple tuple,
+				 spgInnerConsistentOut *out, int i, bool isnull,
+				 double *distances)
+{
+	SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
+
+	item->heapPtr = tuple->t_tid;
+	item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
+								 : parentItem->level;
+
+	/* Must copy value out of temp context */
+	item->value = out->reconstructedValues
+					? datumCopy(out->reconstructedValues[i],
+								so->state.attLeafType.attbyval,
+								so->state.attLeafType.attlen)
+					: (Datum) 0;
+
+	/*
+	 * Elements of out.traversalValues should be allocated in
+	 * in.traversalMemoryContext, which is actually a long
+	 * lived context of index scan.
+	 */
+	item->traversalValue =
+			out->traversalValues ? out->traversalValues[i] : NULL;
+
+	item->isLeaf = false;
+	item->recheckQual = false;
+	item->recheckDist = false;
+
+	return item;
+}
 
-	procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
-	result = DatumGetBool(FunctionCall2Coll(procinfo,
-											index->rd_indcollation[0],
-											PointerGetDatum(&in),
-											PointerGetDatum(&out)));
+static void
+spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
+			 SpGistInnerTuple innerTuple, bool isnull)
+{
+	MemoryContext			oldCxt = MemoryContextSwitchTo(so->tempCxt);
+	spgInnerConsistentOut	out;
+	int						nNodes = innerTuple->nNodes;
+	int						i;
 
-	*leafValue = out.leafValue;
-	*recheck = out.recheck;
+	memset(&out, 0, sizeof(out));
 
-	MemoryContextSwitchTo(oldCtx);
+	if (!isnull)
+	{
+		spgInnerConsistentIn in;
 
-	return result;
+		spgInitInnerConsistentIn(&in, so, item, innerTuple);
+
+		/* use user-defined inner consistent method */
+		FunctionCall2Coll(&so->innerConsistentFn,
+						  so->indexCollation,
+						  PointerGetDatum(&in),
+						  PointerGetDatum(&out));
+	}
+	else
+	{
+		/* force all children to be visited */
+		out.nNodes = nNodes;
+		out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
+		for (i = 0; i < nNodes; i++)
+			out.nodeNumbers[i] = i;
+	}
+
+	/* If allTheSame, they should all or none of 'em match */
+	if (innerTuple->allTheSame)
+		if (out.nNodes != 0 && out.nNodes != nNodes)
+			elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
+
+	if (out.nNodes)
+	{
+		/* collect node pointers */
+		SpGistNodeTuple		node;
+		SpGistNodeTuple	   *nodes = (SpGistNodeTuple *) palloc(
+									sizeof(SpGistNodeTuple) * nNodes);
+
+		SGITITERATE(innerTuple, i, node)
+		{
+			nodes[i] = node;
+		}
+
+		MemoryContextSwitchTo(so->queueCxt);
+
+		for (i = 0; i < out.nNodes; i++)
+		{
+			int			nodeN = out.nodeNumbers[i];
+			SpGistSearchItem *innerItem;
+			double	   *distances;
+
+			Assert(nodeN >= 0 && nodeN < nNodes);
+
+			node = nodes[nodeN];
+
+			if (!ItemPointerIsValid(&node->t_tid))
+				continue;
+
+			/*
+			 * Use infinity distances if innerConsistent() failed to return
+			 * them or if is a NULL item (their distances are really unused).
+			 */
+			distances = out.distances ? out.distances[i] : so->infDistances;
+
+			innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
+										 distances);
+
+			spgAddSearchItemToQueue(so, innerItem);
+		}
+	}
+
+	MemoryContextSwitchTo(oldCxt);
+}
+
+/* Returns a next item in an (ordered) scan or null if the index is exhausted */
+static SpGistSearchItem *
+spgGetNextQueueItem(SpGistScanOpaque so)
+{
+	SpGistSearchItem *item;
+
+	if (so->queue)
+	{
+		if (pairingheap_is_empty(so->queue))
+			return NULL;	/* Done when both heaps are empty */
+
+		item = (SpGistSearchItem *) pairingheap_remove_first(so->queue);
+	}
+	else
+	{
+		if (!so->scanStack)
+			return NULL;	/* there are no more items to scan */
+
+		item = (SpGistSearchItem *) linitial(so->scanStack);
+		so->scanStack = list_delete_first(so->scanStack);
+	}
+
+	/* Return item; caller is responsible to pfree it */
+	return item;
+}
+
+enum SpGistSpecialOffsetNumbers
+{
+	SpGistBreakOffsetNumber = InvalidOffsetNumber,
+	SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
+	SpGistErrorOffsetNumber = MaxOffsetNumber + 2
+};
+
+static OffsetNumber
+spgTestLeafTuple(SpGistScanOpaque so,
+				 SpGistSearchItem *item,
+				 Page page, OffsetNumber offset,
+				 bool isnull, bool isroot,
+				 bool *reportedSome,
+				 storeRes_func storeRes)
+{
+	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
+		PageGetItem(page, PageGetItemId(page, offset));
+
+	if (leafTuple->tupstate != SPGIST_LIVE)
+	{
+		if (!isroot) /* all tuples on root should be live */
+		{
+			if (leafTuple->tupstate == SPGIST_REDIRECT)
+			{
+				/* redirection tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
+				/* transfer attention to redirect point */
+				item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
+				Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
+				return SpGistRedirectOffsetNumber;
+			}
+
+			if (leafTuple->tupstate == SPGIST_DEAD)
+			{
+				/* dead tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
+				/* No live entries on this page */
+				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+				return SpGistBreakOffsetNumber;
+			}
+		}
+
+		/* We should not arrive at a placeholder */
+		elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
+		return SpGistErrorOffsetNumber;
+	}
+
+	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
+
+	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+
+	return leafTuple->nextOffset;
 }
 
 /*
@@ -313,247 +748,101 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
 
 	while (scanWholeIndex || !reportedSome)
 	{
-		ScanStackEntry *stackEntry;
-		BlockNumber blkno;
-		OffsetNumber offset;
-		Page		page;
-		bool		isnull;
-
-		/* Pull next to-do item from the list */
-		if (so->scanStack == NIL)
-			break;				/* there are no more pages to scan */
+		SpGistSearchItem *item = spgGetNextQueueItem(so);
 
-		stackEntry = (ScanStackEntry *) linitial(so->scanStack);
-		so->scanStack = list_delete_first(so->scanStack);
+		if (item == NULL)
+			break; /* No more items in queue -> done */
 
 redirect:
 		/* Check for interrupts, just in case of infinite loop */
 		CHECK_FOR_INTERRUPTS();
 
-		blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
-		offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
-
-		if (buffer == InvalidBuffer)
+		if (item->isLeaf)
 		{
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			/* We store heap items in the queue only in case of ordered search */
+			Assert(so->numberOfOrderBys > 0);
+			storeRes(so, &item->heapPtr, item->value, item->isNull,
+					 item->recheckQual, item->recheckDist, item->distances);
+			reportedSome = true;
 		}
-		else if (blkno != BufferGetBlockNumber(buffer))
+		else
 		{
-			UnlockReleaseBuffer(buffer);
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
-		}
-		/* else new pointer points to the same page, no work needed */
+			BlockNumber		blkno  = ItemPointerGetBlockNumber(&item->heapPtr);
+			OffsetNumber	offset = ItemPointerGetOffsetNumber(&item->heapPtr);
+			Page			page;
+			bool			isnull;
 
-		page = BufferGetPage(buffer);
-		TestForOldSnapshot(snapshot, index, page);
+			if (buffer == InvalidBuffer)
+			{
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
+			else if (blkno != BufferGetBlockNumber(buffer))
+			{
+				UnlockReleaseBuffer(buffer);
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
 
-		isnull = SpGistPageStoresNulls(page) ? true : false;
+			/* else new pointer points to the same page, no work needed */
 
-		if (SpGistPageIsLeaf(page))
-		{
-			SpGistLeafTuple leafTuple;
-			OffsetNumber max = PageGetMaxOffsetNumber(page);
-			Datum		leafValue = (Datum) 0;
-			bool		recheck = false;
+			page = BufferGetPage(buffer);
+			TestForOldSnapshot(snapshot, index, page);
+
+			isnull = SpGistPageStoresNulls(page) ? true : false;
 
-			if (SpGistBlockIsRoot(blkno))
+			if (SpGistPageIsLeaf(page))
 			{
-				/* When root is a leaf, examine all its tuples */
-				for (offset = FirstOffsetNumber; offset <= max; offset++)
-				{
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
-					{
-						/* all tuples on root should be live */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
-					}
+				/* Page is a leaf - that is, all it's tuples are heap items */
+				OffsetNumber max = PageGetMaxOffsetNumber(page);
 
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
-					}
+				if (SpGistBlockIsRoot(blkno))
+				{
+					/* When root is a leaf, examine all its tuples */
+					for (offset = FirstOffsetNumber; offset <= max; offset++)
+						(void) spgTestLeafTuple(so, item, page, offset,
+												isnull, true,
+												&reportedSome, storeRes);
 				}
-			}
-			else
-			{
-				/* Normal case: just examine the chain we arrived at */
-				while (offset != InvalidOffsetNumber)
+				else
 				{
-					Assert(offset >= FirstOffsetNumber && offset <= max);
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
+					/* Normal case: just examine the chain we arrived at */
+					while (offset != InvalidOffsetNumber)
 					{
-						if (leafTuple->tupstate == SPGIST_REDIRECT)
-						{
-							/* redirection tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* transfer attention to redirect point */
-							stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
-							Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
+						Assert(offset >= FirstOffsetNumber && offset <= max);
+						offset = spgTestLeafTuple(so, item, page, offset,
+												  isnull, false,
+												  &reportedSome, storeRes);
+						if (offset == SpGistRedirectOffsetNumber)
 							goto redirect;
-						}
-						if (leafTuple->tupstate == SPGIST_DEAD)
-						{
-							/* dead tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* No live entries on this page */
-							Assert(leafTuple->nextOffset == InvalidOffsetNumber);
-							break;
-						}
-						/* We should not arrive at a placeholder */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
 					}
-
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
-					}
-
-					offset = leafTuple->nextOffset;
 				}
 			}
-		}
-		else					/* page is inner */
-		{
-			SpGistInnerTuple innerTuple;
-			spgInnerConsistentIn in;
-			spgInnerConsistentOut out;
-			FmgrInfo   *procinfo;
-			SpGistNodeTuple *nodes;
-			SpGistNodeTuple node;
-			int			i;
-			MemoryContext oldCtx;
-
-			innerTuple = (SpGistInnerTuple) PageGetItem(page,
-														PageGetItemId(page, offset));
-
-			if (innerTuple->tupstate != SPGIST_LIVE)
-			{
-				if (innerTuple->tupstate == SPGIST_REDIRECT)
-				{
-					/* transfer attention to redirect point */
-					stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
-					Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
-					goto redirect;
-				}
-				elog(ERROR, "unexpected SPGiST tuple state: %d",
-					 innerTuple->tupstate);
-			}
-
-			/* use temp context for calling inner_consistent */
-			oldCtx = MemoryContextSwitchTo(so->tempCxt);
-
-			in.scankeys = so->keyData;
-			in.nkeys = so->numberOfKeys;
-			in.reconstructedValue = stackEntry->reconstructedValue;
-			in.traversalMemoryContext = so->traversalCxt;
-			in.traversalValue = stackEntry->traversalValue;
-			in.level = stackEntry->level;
-			in.returnData = so->want_itup;
-			in.allTheSame = innerTuple->allTheSame;
-			in.hasPrefix = (innerTuple->prefixSize > 0);
-			in.prefixDatum = SGITDATUM(innerTuple, &so->state);
-			in.nNodes = innerTuple->nNodes;
-			in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
-
-			/* collect node pointers */
-			nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
-			SGITITERATE(innerTuple, i, node)
-			{
-				nodes[i] = node;
-			}
-
-			memset(&out, 0, sizeof(out));
-
-			if (!isnull)
-			{
-				/* use user-defined inner consistent method */
-				procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
-				FunctionCall2Coll(procinfo,
-								  index->rd_indcollation[0],
-								  PointerGetDatum(&in),
-								  PointerGetDatum(&out));
-			}
-			else
-			{
-				/* force all children to be visited */
-				out.nNodes = in.nNodes;
-				out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
-				for (i = 0; i < in.nNodes; i++)
-					out.nodeNumbers[i] = i;
-			}
-
-			MemoryContextSwitchTo(oldCtx);
-
-			/* If allTheSame, they should all or none of 'em match */
-			if (innerTuple->allTheSame)
-				if (out.nNodes != 0 && out.nNodes != in.nNodes)
-					elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
-
-			for (i = 0; i < out.nNodes; i++)
+			else	/* page is inner */
 			{
-				int			nodeN = out.nodeNumbers[i];
+				SpGistInnerTuple innerTuple = (SpGistInnerTuple)
+						PageGetItem(page, PageGetItemId(page, offset));
 
-				Assert(nodeN >= 0 && nodeN < in.nNodes);
-				if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
+				if (innerTuple->tupstate != SPGIST_LIVE)
 				{
-					ScanStackEntry *newEntry;
-
-					/* Create new work item for this node */
-					newEntry = palloc(sizeof(ScanStackEntry));
-					newEntry->ptr = nodes[nodeN]->t_tid;
-					if (out.levelAdds)
-						newEntry->level = stackEntry->level + out.levelAdds[i];
-					else
-						newEntry->level = stackEntry->level;
-					/* Must copy value out of temp context */
-					if (out.reconstructedValues)
-						newEntry->reconstructedValue =
-							datumCopy(out.reconstructedValues[i],
-									  so->state.attLeafType.attbyval,
-									  so->state.attLeafType.attlen);
-					else
-						newEntry->reconstructedValue = (Datum) 0;
-
-					/*
-					 * Elements of out.traversalValues should be allocated in
-					 * in.traversalMemoryContext, which is actually a long
-					 * lived context of index scan.
-					 */
-					newEntry->traversalValue = (out.traversalValues) ?
-						out.traversalValues[i] : NULL;
-
-					so->scanStack = lcons(newEntry, so->scanStack);
+					if (innerTuple->tupstate == SPGIST_REDIRECT)
+					{
+						/* transfer attention to redirect point */
+						item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
+						Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
+							   SPGIST_METAPAGE_BLKNO);
+						goto redirect;
+					}
+					elog(ERROR, "unexpected SPGiST tuple state: %d",
+						 innerTuple->tupstate);
 				}
+
+				spgInnerTest(so, item, innerTuple, isnull);
 			}
 		}
 
-		/* done with this scan stack entry */
-		freeScanStackEntry(so, stackEntry);
+		/* done with this scan item */
+		spgFreeSearchItem(so, item);
 		/* clear temp context before proceeding to the next one */
 		MemoryContextReset(so->tempCxt);
 	}
@@ -562,11 +851,14 @@ redirect:
 		UnlockReleaseBuffer(buffer);
 }
 
+
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-			Datum leafValue, bool isnull, bool recheck)
+			Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			double *distances)
 {
+	Assert(!recheckDist && !distances);
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
 	so->ntids++;
 }
@@ -590,11 +882,26 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-			  Datum leafValue, bool isnull, bool recheck)
+			  Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			  double *distances)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
 	so->recheck[so->nPtrs] = recheck;
+	so->recheckDist[so->nPtrs] = recheckDist;
+
+	if (so->numberOfOrderBys > 0)
+	{
+		if (isnull)
+			so->distances[so->nPtrs] = NULL;
+		else
+		{
+			Size		size = sizeof(double) * so->numberOfOrderBys;
+
+			so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+		}
+	}
+
 	if (so->want_itup)
 	{
 		/*
@@ -623,14 +930,29 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 	{
 		if (so->iPtr < so->nPtrs)
 		{
-			/* continuing to return tuples from a leaf page */
+			/* continuing to return reported tuples */
 			scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
 			scan->xs_recheck = so->recheck[so->iPtr];
 			scan->xs_hitup = so->reconTups[so->iPtr];
+
+			if (so->numberOfOrderBys > 0)
+				index_store_float8_orderby_distances(scan, so->orderByTypes,
+													 so->distances[so->iPtr],
+													 so->recheckDist[so->iPtr]);
 			so->iPtr++;
 			return true;
 		}
 
+		if (so->numberOfOrderBys > 0)
+		{
+			/* Must pfree distances to avoid memory leak */
+			int			i;
+
+			for (i = 0; i < so->nPtrs; i++)
+				if (so->distances[i])
+					pfree(so->distances[i]);
+		}
+
 		if (so->want_itup)
 		{
 			/* Must pfree reconstructed tuples to avoid memory leak */
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 4a9b5da..ffbba78 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -15,17 +15,26 @@
 
 #include "postgres.h"
 
+#include "access/amvalidate.h"
+#include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_amop.h"
+#include "optimizer/paths.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
+#include "utils/catcache.h"
 #include "utils/index_selfuncs.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
+extern Expr *spgcanorderbyop(IndexOptInfo *index,
+							 PathKey *pathkey, int pathkeyno,
+							 Expr *orderby_clause, int *indexcol_p);
 
 /*
  * SP-GiST handler function: return IndexAmRoutine with access method parameters
@@ -39,7 +48,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amstrategies = 0;
 	amroutine->amsupport = SPGISTNProc;
 	amroutine->amcanorder = false;
-	amroutine->amcanorderbyop = false;
+	amroutine->amcanorderbyop = true;
 	amroutine->amcanbackward = false;
 	amroutine->amcanunique = false;
 	amroutine->amcanmulticol = false;
@@ -61,7 +70,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amcanreturn = spgcanreturn;
 	amroutine->amcostestimate = spgcostestimate;
 	amroutine->amoptions = spgoptions;
-	amroutine->amproperty = NULL;
+	amroutine->amproperty = spgproperty;
 	amroutine->amvalidate = spgvalidate;
 	amroutine->ambeginscan = spgbeginscan;
 	amroutine->amrescan = spgrescan;
@@ -949,3 +958,82 @@ SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
 
 	return offnum;
 }
+
+/*
+ *	spgproperty() -- Check boolean properties of indexes.
+ *
+ * This is optional for most AMs, but is required for SP-GiST because the core
+ * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
+ */
+bool
+spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull)
+{
+	Oid			opclass,
+				opfamily,
+				opcintype;
+	CatCList   *catlist;
+	int			i;
+
+	/* Only answer column-level inquiries */
+	if (attno == 0)
+		return false;
+
+	switch (prop)
+	{
+		case AMPROP_DISTANCE_ORDERABLE:
+			break;
+		default:
+			return false;
+	}
+
+	/*
+	 * Currently, SP-GiST distance-ordered scans require that there be a
+	 * distance operator in the opclass with the default types. So we assume
+	 * that if such a operator exists, then there's a reason for it.
+	 */
+
+	/* First we need to know the column's opclass. */
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* Now look up the opclass family and input datatype. */
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* And now we can check whether the operator is provided. */
+	catlist = SearchSysCacheList1(AMOPSTRATEGY,
+								  ObjectIdGetDatum(opfamily));
+
+	*res = false;
+
+	for (i = 0; i < catlist->n_members; i++)
+	{
+		HeapTuple	amoptup = &catlist->members[i]->tuple;
+		Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
+
+		if (amopform->amoppurpose == AMOP_ORDER &&
+			(amopform->amoplefttype == opcintype ||
+			 amopform->amoprighttype == opcintype) &&
+			 opfamily_can_sort_type(amopform->amopsortfamily,
+									get_op_rettype(amopform->amopopr)))
+		{
+			*res = true;
+			break;
+		}
+	}
+
+	ReleaseSysCacheList(catlist);
+
+	*isnull = false;
+
+	return true;
+}
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 619c357..898fc38 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -187,6 +187,7 @@ spgvalidate(Oid opclassoid)
 	{
 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
+		Oid			op_rettype;
 
 		/* TODO: Check that only allowed strategy numbers exist */
 		if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
@@ -200,20 +201,26 @@ spgvalidate(Oid opclassoid)
 			result = false;
 		}
 
-		/* spgist doesn't support ORDER BY operators */
-		if (oprform->amoppurpose != AMOP_SEARCH ||
-			OidIsValid(oprform->amopsortfamily))
+		/* spgist supports ORDER BY operators */
+		if (oprform->amoppurpose != AMOP_SEARCH)
 		{
-			ereport(INFO,
-					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
-					 errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
-							opfamilyname, "spgist",
-							format_operator(oprform->amopopr))));
-			result = false;
+			/* ... and operator result must match the claimed btree opfamily */
+			op_rettype = get_op_rettype(oprform->amopopr);
+			if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
+			{
+				ereport(INFO,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
+								opfamilyname, "spgist",
+								format_operator(oprform->amopopr))));
+				result = false;
+			}
 		}
+		else
+			op_rettype = BOOLOID;
 
 		/* Check operator signature --- same for all spgist strategies */
-		if (!check_amop_signature(oprform->amopopr, BOOLOID,
+		if (!check_amop_signature(oprform->amopopr, op_rettype,
 								  oprform->amoplefttype,
 								  oprform->amoprighttype))
 		{
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index c6d7e22..e626efc 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -136,7 +136,9 @@ typedef struct spgPickSplitOut
 typedef struct spgInnerConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbyKeys;
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue; /* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -159,6 +161,7 @@ typedef struct spgInnerConsistentOut
 	int		   *levelAdds;		/* increment level by this much for each */
 	Datum	   *reconstructedValues;	/* associated reconstructed values */
 	void	  **traversalValues;	/* opclass-specific traverse values */
+	double	  **distances;		/* associated distances */
 } spgInnerConsistentOut;
 
 /*
@@ -167,7 +170,10 @@ typedef struct spgInnerConsistentOut
 typedef struct spgLeafConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbykeys;	/* array of ordering operators and comparison
+								 * values */
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue; /* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -181,6 +187,8 @@ typedef struct spgLeafConsistentOut
 {
 	Datum		leafValue;		/* reconstructed original data, if any */
 	bool		recheck;		/* set true if operator must be rechecked */
+	bool		recheckDistances; /* set true if distances must be rechecked */
+	double	   *distances;		/* associated distances */
 } spgLeafConsistentOut;
 
 
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index 99365c8..e1ad281 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -16,8 +16,10 @@
 
 #include "access/itup.h"
 #include "access/spgist.h"
+#include "lib/rbtree.h"
 #include "nodes/tidbitmap.h"
 #include "storage/buf.h"
+#include "storage/relfilenode.h"
 #include "utils/relcache.h"
 
 
@@ -130,12 +132,35 @@ typedef struct SpGistState
 	bool		isBuild;		/* true if doing index build */
 } SpGistState;
 
+typedef struct SpGistSearchItem
+{
+	pairingheap_node phNode;	/* pairing heap node */
+	Datum		value;			/* value reconstructed from parent or
+								 * leafValue if heaptuple */
+	void	   *traversalValue; /* opclass-specific traverse value */
+	int			level;			/* level of items on this page */
+	ItemPointerData heapPtr;	/* heap info, if heap tuple */
+	bool		isNull;			/* SearchItem is NULL item */
+	bool		isLeaf;			/* SearchItem is heap item */
+	bool		recheckQual;	/* qual recheck is needed */
+	bool		recheckDist;	/* distance recheck is needed */
+
+	/* array with numberOfOrderBys entries */
+	double		distances[FLEXIBLE_ARRAY_MEMBER];
+} SpGistSearchItem;
+
+#define SizeOfSpGistSearchItem(n_distances) \
+	(offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances))
+
 /*
  * Private state of an index scan
  */
 typedef struct SpGistScanOpaqueData
 {
 	SpGistState state;			/* see above */
+	List	   *scanStack;		/* list of SpGistSearchItem (unordered scans) */
+	pairingheap *queue;			/* queue of unvisited items (ordered scans) */
+	MemoryContext queueCxt;		/* context holding the queue */
 	MemoryContext tempCxt;		/* short-lived memory context */
 	MemoryContext traversalCxt; /* memory context for traversalValues */
 
@@ -146,9 +171,17 @@ typedef struct SpGistScanOpaqueData
 	/* Index quals to be passed to opclass (null-related quals removed) */
 	int			numberOfKeys;	/* number of index qualifier conditions */
 	ScanKey		keyData;		/* array of index qualifier descriptors */
+	int			numberOfOrderBys;
+	ScanKey		orderByData;
+	Oid		   *orderByTypes;
+
+	FmgrInfo	innerConsistentFn;
+	FmgrInfo	leafConsistentFn;
+	Oid			indexCollation;
 
-	/* Stack of yet-to-be-visited pages */
-	List	   *scanStack;		/* List of ScanStackEntrys */
+	/* Pre-allocated workspace arrays: */
+	double	   *zeroDistances;
+	double	   *infDistances;
 
 	/* These fields are only used in amgetbitmap scans: */
 	TIDBitmap  *tbm;			/* bitmap being filled */
@@ -161,7 +194,9 @@ typedef struct SpGistScanOpaqueData
 	int			iPtr;			/* index for scanning through same */
 	ItemPointerData heapPtrs[MaxIndexTuplesPerPage];	/* TIDs from cur page */
 	bool		recheck[MaxIndexTuplesPerPage]; /* their recheck flags */
+	bool		recheckDist[MaxIndexTuplesPerPage]; /* distance recheck flags */
 	HeapTuple	reconTups[MaxIndexTuplesPerPage];	/* reconstructed tuples */
+	double	   *distances[MaxIndexTuplesPerPage];	/* distances (for recheck) */
 
 	/*
 	 * Note: using MaxIndexTuplesPerPage above is a bit hokey since
@@ -410,6 +445,9 @@ extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
 					 Item item, Size size,
 					 OffsetNumber *startOffset,
 					 bool errorOK);
+extern bool spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull);
 
 /* spgdoinsert.c */
 extern void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 5358bbb..187f1f4 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -64,12 +64,13 @@
 
   <table id="spgist-builtin-opclasses-table">
    <title>Built-in <acronym>SP-GiST</acronym> Operator Classes</title>
-   <tgroup cols="3">
+   <tgroup cols="4">
     <thead>
      <row>
       <entry>Name</entry>
       <entry>Indexed Data Type</entry>
       <entry>Indexable Operators</entry>
+      <entry>Ordering Operators</entry>
      </row>
     </thead>
     <tbody>
@@ -84,6 +85,9 @@
        <literal>&gt;^</literal>
        <literal>~=</literal>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</literal>
+      </entry>
      </row>
      <row>
       <entry><literal>quad_point_ops</literal></entry>
@@ -96,6 +100,9 @@
        <literal>&gt;^</literal>
        <literal>~=</literal>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</literal>
+      </entry>
      </row>
      <row>
       <entry><literal>range_ops</literal></entry>
@@ -111,6 +118,8 @@
        <literal>&gt;&gt;</literal>
        <literal>@&gt;</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>box_ops</literal></entry>
@@ -129,6 +138,8 @@
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>poly_ops</literal></entry>
@@ -147,6 +158,8 @@
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</literal>
       </entry>
+      <entry>
+      </entry>      
      </row>
      <row>
       <entry><literal>text_ops</literal></entry>
@@ -163,6 +176,8 @@
        <literal>~&gt;~</literal>
        <literal>^@</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>inet_ops</literal></entry>
@@ -180,6 +195,8 @@
        <literal>&lt;=</literal>
        <literal>=</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
     </tbody>
    </tgroup>
@@ -191,6 +208,12 @@
   supports the same operators but uses a different index data structure which
   may offer better performance in some applications.
  </para>
+ <para>
+  The <literal>quad_point_ops</literal>, <literal>kd_point_ops</literal>
+  classes support the <literal>&lt;-&gt;</literal> ordering operator, which enables
+  the k-nearest neighbor (<literal>k-NN</literal>) search over indexed
+  point datasets.
+ </para>
 
 </sect1>
 
diff --git a/src/backend/access/spgist/Makefile b/src/backend/access/spgist/Makefile
index 14948a5..5be3df5 100644
--- a/src/backend/access/spgist/Makefile
+++ b/src/backend/access/spgist/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \
 	spgdoinsert.o spgxlog.o \
-	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o
+	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o \
+	spgproc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/spgist/spgkdtreeproc.c b/src/backend/access/spgist/spgkdtreeproc.c
index 556f3a4..91403ff 100644
--- a/src/backend/access/spgist/spgkdtreeproc.c
+++ b/src/backend/access/spgist/spgkdtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -162,6 +163,7 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	double		coord;
 	int			which;
 	int			i;
+	BOX			boxes[2];
 
 	Assert(in->hasPrefix);
 	coord = DatumGetFloat8(in->prefixDatum);
@@ -248,12 +250,75 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	}
 
 	/* We must descend into the children identified by which */
-	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
 	out->nNodes = 0;
+
+	if (!which)
+		PG_RETURN_VOID();
+
+	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
+
+	if (in->norderbys > 0)
+	{
+		BOX			infArea;
+		BOX		   *area;
+
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			float8		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = (BOX *) in->traversalValue;
+			Assert(area);
+		}
+
+		boxes[0].low = area->low;
+		boxes[1].high = area->high;
+
+		if (in->level % 2)
+		{
+			/* split box by x */
+			boxes[0].high.x = boxes[1].low.x = coord;
+			boxes[0].high.y = area->high.y;
+			boxes[1].low.y = area->low.y;
+		}
+		else
+		{
+			/* split box by y */
+			boxes[0].high.y = boxes[1].low.y = coord;
+			boxes[0].high.x = area->high.x;
+			boxes[1].low.x = area->low.x;
+		}
+	}
+
 	for (i = 1; i <= 2; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *box = box_copy(&boxes[i - 1]);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = box;
+
+				spg_point_distance(BoxPGetDatum(box),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	/* Set up level increments, too */
diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c
new file mode 100644
index 0000000..23a8d09
--- /dev/null
+++ b/src/backend/access/spgist/spgproc.c
@@ -0,0 +1,68 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgproc.c
+ *	  Common procedures for SP-GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *			src/backend/access/spgist/spgproc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/spgist_private.h"
+#include "utils/builtins.h"
+#include "utils/geo_decls.h"
+
+/* Point-box distance in the assumption that box is aligned by axis */
+static double
+point_box_distance(Point *point, BOX *box)
+{
+	double		dx,
+				dy;
+
+	if (isnan(point->x) || isnan(box->low.x) ||
+		isnan(point->y) || isnan(box->low.y))
+		return get_float8_nan();
+
+	if (point->x < box->low.x)
+		dx = box->low.x - point->x;
+	else if (point->x > box->high.x)
+		dx = point->x - box->high.x;
+	else
+		dx = 0.0;
+
+	if (point->y < box->low.y)
+		dy = box->low.y - point->y;
+	else if (point->y > box->high.y)
+		dy = point->y - box->high.y;
+	else
+		dy = 0.0;
+
+	return HYPOT(dx, dy);
+}
+
+void
+spg_point_distance(Datum to, int norderbys, ScanKey orderbyKeys,
+				   double **distances, bool isLeaf)
+{
+	double	   *distance;
+	int			sk_num;
+
+	distance = *distances = (double *) palloc(norderbys * sizeof(double));
+
+	for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbyKeys, ++distance)
+	{
+		Point	   *point = DatumGetPointP(orderbyKeys->sk_argument);
+
+		*distance = isLeaf ? point_dt(point, DatumGetPointP(to))
+						   : point_box_distance(point, DatumGetBoxP(to));
+	}
+}
diff --git a/src/backend/access/spgist/spgquadtreeproc.c b/src/backend/access/spgist/spgquadtreeproc.c
index 8700ff3..7883ad4 100644
--- a/src/backend/access/spgist/spgquadtreeproc.c
+++ b/src/backend/access/spgist/spgquadtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -77,6 +78,32 @@ getQuadrant(Point *centroid, Point *tst)
 	return 0;
 }
 
+/* Returns bounding box of a given quadrant */
+static BOX *
+getQuadrantArea(BOX *area, Point *centroid, int quadrant)
+{
+	BOX		   *box = (BOX *) palloc(sizeof(BOX));
+
+	switch (quadrant)
+	{
+		case 1:
+			box->high = area->high;
+			box->low = *centroid;
+			break;
+		case 2:
+			box_fill(box, centroid->x, area->high.x, area->low.y, centroid->y);
+			break;
+		case 3:
+			box->high = *centroid;
+			box->low = area->low;
+			break;
+		case 4:
+			box_fill(box, area->low.x, centroid->x, centroid->y, area->high.y);
+			break;
+	}
+
+	return box;
+}
 
 Datum
 spg_quad_choose(PG_FUNCTION_ARGS)
@@ -196,19 +223,56 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
 	Point	   *centroid;
+	BOX			infArea;
+	BOX		   *area = NULL;
 	int			which;
 	int			i;
 
 	Assert(in->hasPrefix);
 	centroid = DatumGetPointP(in->prefixDatum);
 
+	if (in->norderbys > 0)
+	{
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			double		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = in->traversalValue;
+			Assert(area);
+		}
+	}
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
 		out->nNodes = in->nNodes;
 		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 		for (i = 0; i < in->nNodes; i++)
+		{
 			out->nodeNumbers[i] = i;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				/* Use parent quadrant box as traversalValue */
+				BOX		   *quadrant = box_copy(area);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[i] = quadrant;
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[i], false);
+			}
+		}
 		PG_RETURN_VOID();
 	}
 
@@ -286,13 +350,37 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 			break;				/* no need to consider remaining conditions */
 	}
 
+	out->levelAdds = palloc(sizeof(int) * 4);
+	for (i = 0; i < 4; ++i)
+		out->levelAdds[i] = 1;
+
 	/* We must descend into the quadrant(s) identified by which */
 	out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
 	out->nNodes = 0;
+
 	for (i = 1; i <= 4; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *quadrant = getQuadrantArea(area, centroid, i);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = quadrant;
+
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	PG_RETURN_VOID();
@@ -356,5 +444,10 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (res && in->norderbys > 0)
+		/* ok, it passes -> let's compute the distances */
+		spg_point_distance(in->leafDatum,
+					  in->norderbys, in->orderbykeys, &out->distances, true);
+
 	PG_RETURN_BOOL(res);
 }
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index e1ad281..4c1adf2 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -459,4 +459,8 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
 extern bool spgdoinsert(Relation index, SpGistState *state,
 			ItemPointer heapPtr, Datum datum, bool isnull);
 
+/* spgproc.c */
+extern void spg_point_distance(Datum to, int norderbys,
+				   ScanKey orderbyKeys, double **distances, bool isLeaf);
+
 #endif							/* SPGIST_PRIVATE_H */
diff --git a/src/include/catalog/pg_amop.dat b/src/include/catalog/pg_amop.dat
index fb58f77..f8f149e 100644
--- a/src/include/catalog/pg_amop.dat
+++ b/src/include/catalog/pg_amop.dat
@@ -1401,6 +1401,10 @@
 { amopfamily => 'spgist/quad_point_ops', amoplefttype => 'point',
   amoprighttype => 'box', amopstrategy => '8', amopopr => '<@(point,box)',
   amopmethod => 'spgist' },
+{ amopfamily => 'spgist/quad_point_ops', amoplefttype => 'point',
+  amoprighttype => 'point', amopstrategy => '15', amopopr => '<->(point,point)',
+  amopmethod => 'spgist', amoppurpose => 'o',
+  amopsortfamily => 'btree/float_ops' },
 
 # SP-GiST kd_point_ops
 { amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
@@ -1421,6 +1425,10 @@
 { amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
   amoprighttype => 'box', amopstrategy => '8', amopopr => '<@(point,box)',
   amopmethod => 'spgist' },
+{ amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
+  amoprighttype => 'point', amopstrategy => '15', amopopr => '<->(point,point)',
+  amopmethod => 'spgist', amoppurpose => 'o',
+  amopsortfamily => 'btree/float_ops' },
 
 # SP-GiST text_ops
 { amopfamily => 'spgist/text_ops', amoplefttype => 'text',
diff --git a/src/test/regress/expected/amutils.out b/src/test/regress/expected/amutils.out
index 24cd3c5..4570a39 100644
--- a/src/test/regress/expected/amutils.out
+++ b/src/test/regress/expected/amutils.out
@@ -83,7 +83,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
@@ -92,18 +93,18 @@ select prop,
                     'bogus']::text[])
          with ordinality as u(prop,ord)
  order by ord;
-        prop        | btree | hash | gist | spgist | gin | brin 
---------------------+-------+------+------+--------+-----+------
- asc                | t     | f    | f    | f      | f   | f
- desc               | f     | f    | f    | f      | f   | f
- nulls_first        | f     | f    | f    | f      | f   | f
- nulls_last         | t     | f    | f    | f      | f   | f
- orderable          | t     | f    | f    | f      | f   | f
- distance_orderable | f     | f    | t    | f      | f   | f
- returnable         | t     | f    | f    | t      | f   | f
- search_array       | t     | f    | f    | f      | f   | f
- search_nulls       | t     | f    | t    | t      | f   | t
- bogus              |       |      |      |        |     | 
+        prop        | btree | hash | gist | spgist_radix | spgist_quad | gin | brin 
+--------------------+-------+------+------+--------------+-------------+-----+------
+ asc                | t     | f    | f    | f            | f           | f   | f
+ desc               | f     | f    | f    | f            | f           | f   | f
+ nulls_first        | f     | f    | f    | f            | f           | f   | f
+ nulls_last         | t     | f    | f    | f            | f           | f   | f
+ orderable          | t     | f    | f    | f            | f           | f   | f
+ distance_orderable | f     | f    | t    | f            | t           | f   | f
+ returnable         | t     | f    | f    | t            | t           | f   | f
+ search_array       | t     | f    | f    | f            | f           | f   | f
+ search_nulls       | t     | f    | t    | t            | t           | f   | t
+ bogus              |       |      |      |              |             |     | 
 (10 rows)
 
 select prop,
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index fc81088..b513489 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -294,6 +294,15 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
  count 
 -------
@@ -889,6 +898,74 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+   n   | dist | p |   n   | dist | p 
+-------+------+---+-------+------+---
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+(6 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                        QUERY PLAN                        
 ---------------------------------------------------------
@@ -994,6 +1071,71 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
                          QUERY PLAN                         
 ------------------------------------------------------------
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index a1e18a6..1efea24 100644
--- a/src/test/regress/expected/opr_sanity.out
+++ b/src/test/regress/expected/opr_sanity.out
@@ -1876,6 +1876,7 @@ ORDER BY 1, 2, 3;
        4000 |           12 | <=
        4000 |           12 | |&>
        4000 |           14 | >=
+       4000 |           15 | <->
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
@@ -1889,7 +1890,7 @@ ORDER BY 1, 2, 3;
        4000 |           26 | >>
        4000 |           27 | >>=
        4000 |           28 | ^@
-(122 rows)
+(123 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing
diff --git a/src/test/regress/sql/amutils.sql b/src/test/regress/sql/amutils.sql
index 8ca85ec..06e7fa1 100644
--- a/src/test/regress/sql/amutils.sql
+++ b/src/test/regress/sql/amutils.sql
@@ -40,7 +40,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index f9e7118..f4418d5 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -198,6 +198,18 @@ SELECT count(*) FROM quad_point_tbl WHERE p >^ '(5000, 4000)';
 
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcde';
@@ -364,6 +376,36 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 
@@ -392,6 +434,39 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 187f1f4..18be8f0 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -159,6 +159,7 @@
        <literal>|&amp;&gt;</literal>
       </entry>
       <entry>
+        <literal>&lt;-&gt;</literal>
       </entry>      
      </row>
      <row>
@@ -209,10 +210,11 @@
   may offer better performance in some applications.
  </para>
  <para>
-  The <literal>quad_point_ops</literal>, <literal>kd_point_ops</literal>
+  The <literal>quad_point_ops</literal>, <literal>kd_point_ops</literal>,
+  and <literal>poly_ops</literal> operator
   classes support the <literal>&lt;-&gt;</literal> ordering operator, which enables
   the k-nearest neighbor (<literal>k-NN</literal>) search over indexed
-  point datasets.
+  point, or polygon datasets.
  </para>
 
 </sect1>
diff --git a/src/backend/utils/adt/geo_spgist.c b/src/backend/utils/adt/geo_spgist.c
index 06411ae..e0aed47 100644
--- a/src/backend/utils/adt/geo_spgist.c
+++ b/src/backend/utils/adt/geo_spgist.c
@@ -74,9 +74,11 @@
 #include "postgres.h"
 
 #include "access/spgist.h"
+#include "access/spgist_private.h"
 #include "access/stratnum.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/geo_decls.h"
 
 /*
@@ -366,6 +368,31 @@ overAbove4D(RectBox *rect_box, RangeBox *query)
 	return overHigher2D(&rect_box->range_box_y, &query->right);
 }
 
+/* Lower bound for the distance between point and rect_box */
+static double
+pointToRectBoxDistance(Point *point, RectBox *rect_box)
+{
+	double		dx;
+	double		dy;
+
+	if (point->x < rect_box->range_box_x.left.low)
+		dx = rect_box->range_box_x.left.low - point->x;
+	else if (point->x > rect_box->range_box_x.right.high)
+		dx = point->x - rect_box->range_box_x.right.high;
+	else
+		dx = 0;
+
+	if (point->y < rect_box->range_box_y.left.low)
+		dy = rect_box->range_box_y.left.low - point->y;
+	else if (point->y > rect_box->range_box_y.right.high)
+		dy = point->y - rect_box->range_box_y.right.high;
+	else
+		dy = 0;
+
+	return HYPOT(dx, dy);
+}
+
+
 /*
  * SP-GiST config function
  */
@@ -533,6 +560,15 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 	RangeBox   *centroid,
 			  **queries;
 
+	/*
+	 * We are saving the traversal value or initialize it an unbounded one, if
+	 * we have just begun to walk the tree.
+	 */
+	if (in->traversalValue)
+		rect_box = in->traversalValue;
+	else
+		rect_box = initRectBox();
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
@@ -541,19 +577,33 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		for (i = 0; i < in->nNodes; i++)
 			out->nodeNumbers[i] = i;
 
+		if (in->norderbys > 0 && in->nNodes > 0)
+		{
+			double	   *distances = palloc(sizeof(double) * in->norderbys);
+			int			j;
+
+			for (j = 0; j < in->norderbys; j++)
+			{
+				Point	   *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+				distances[j] = pointToRectBoxDistance(pt, rect_box);
+			}
+
+			out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+			out->distances[0] = distances;
+
+			for (i = 1; i < in->nNodes; i++)
+			{
+				out->distances[i] = palloc(sizeof(double) * in->norderbys);
+				memcpy(out->distances[i], distances,
+					   sizeof(double) * in->norderbys);
+			}
+		}
+
 		PG_RETURN_VOID();
 	}
 
 	/*
-	 * We are saving the traversal value or initialize it an unbounded one, if
-	 * we have just begun to walk the tree.
-	 */
-	if (in->traversalValue)
-		rect_box = in->traversalValue;
-	else
-		rect_box = initRectBox();
-
-	/*
 	 * We are casting the prefix and queries to RangeBoxes for ease of the
 	 * following operations.
 	 */
@@ -570,6 +620,8 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 	out->nNodes = 0;
 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 	out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+	if (in->norderbys > 0)
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
 
 	/*
 	 * We switch memory context, because we want to allocate memory for new
@@ -647,6 +699,22 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		{
 			out->traversalValues[out->nNodes] = next_rect_box;
 			out->nodeNumbers[out->nNodes] = quadrant;
+
+			if (in->norderbys > 0)
+			{
+				double	   *distances = palloc(sizeof(double) * in->norderbys);
+				int			j;
+
+				out->distances[out->nNodes] = distances;
+
+				for (j = 0; j < in->norderbys; j++)
+				{
+					Point *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+					distances[j] = pointToRectBoxDistance(pt, next_rect_box);
+				}
+			}
+
 			out->nNodes++;
 		}
 		else
@@ -762,6 +830,17 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (flag && in->norderbys > 0)
+	{
+		Oid			distfnoid = in->orderbykeys[0].sk_func.fn_oid;
+
+		spg_point_distance(leaf, in->norderbys, in->orderbykeys,
+						   &out->distances, false);
+
+		/* Recheck is necessary when computing distance to polygon */
+		out->recheckDistances = distfnoid == F_DIST_POLYP;
+	}
+
 	PG_RETURN_BOOL(flag);
 }
 
diff --git a/src/include/catalog/pg_amop.dat b/src/include/catalog/pg_amop.dat
index f8f149e..5f85e95 100644
--- a/src/include/catalog/pg_amop.dat
+++ b/src/include/catalog/pg_amop.dat
@@ -1598,6 +1598,10 @@
 { amopfamily => 'spgist/poly_ops', amoplefttype => 'polygon',
   amoprighttype => 'polygon', amopstrategy => '12',
   amopopr => '|&>(polygon,polygon)', amopmethod => 'spgist' },
+{ amopfamily => 'spgist/poly_ops', amoplefttype => 'polygon',
+  amoprighttype => 'point', amopstrategy => '15',
+  amopopr => '<->(polygon,point)', amoppurpose => 'o', amopmethod => 'spgist',
+  amopsortfamily => 'btree/float_ops' },
 
 # GiST inet_ops
 { amopfamily => 'gist/network_ops', amoplefttype => 'inet',
diff --git a/src/test/regress/expected/polygon.out b/src/test/regress/expected/polygon.out
index 4a1f604..cd8c98b 100644
--- a/src/test/regress/expected/polygon.out
+++ b/src/test/regress/expected/polygon.out
@@ -462,6 +462,54 @@ SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(2
   1000
 (1 row)
 
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Order By: (p <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                   QUERY PLAN                                    
+---------------------------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         Order By: (p <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
index 7e8cb08..ba86669 100644
--- a/src/test/regress/sql/polygon.sql
+++ b/src/test/regress/sql/polygon.sql
@@ -206,6 +206,39 @@ EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
 SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
 
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;

Reply via email to