Attached v02 version (rebased to HEAD).

--
Nikita Glukhov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/src/backend/access/gist/gistutil.c b/src/backend/access/gist/gistutil.c
index f92baed..1dfaba2 100644
--- a/src/backend/access/gist/gistutil.c
+++ b/src/backend/access/gist/gistutil.c
@@ -23,6 +23,7 @@
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
 #include "utils/syscache.h"
+#include "utils/lsyscache.h"
 
 
 /*
@@ -851,12 +852,6 @@ gistproperty(Oid index_oid, int attno,
 			 IndexAMProperty prop, const char *propname,
 			 bool *res, bool *isnull)
 {
-	HeapTuple	tuple;
-	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
-	Form_pg_opclass rd_opclass;
-	Datum		datum;
-	bool		disnull;
-	oidvector  *indclass;
 	Oid			opclass,
 				opfamily,
 				opcintype;
@@ -890,49 +885,28 @@ gistproperty(Oid index_oid, int attno,
 	}
 
 	/* First we need to know the column's opclass. */
-
-	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
-	if (!HeapTupleIsValid(tuple))
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_index = (Form_pg_index) GETSTRUCT(tuple);
-
-	/* caller is supposed to guarantee this */
-	Assert(attno > 0 && attno <= rd_index->indnatts);
-
-	datum = SysCacheGetAttr(INDEXRELID, tuple,
-							Anum_pg_index_indclass, &disnull);
-	Assert(!disnull);
-
-	indclass = ((oidvector *) DatumGetPointer(datum));
-	opclass = indclass->values[attno - 1];
-
-	ReleaseSysCache(tuple);
 
 	/* Now look up the opclass family and input datatype. */
-
-	tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
-	if (!HeapTupleIsValid(tuple))
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
 	{
 		*isnull = true;
 		return true;
 	}
-	rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
-
-	opfamily = rd_opclass->opcfamily;
-	opcintype = rd_opclass->opcintype;
-
-	ReleaseSysCache(tuple);
 
 	/* And now we can check whether the function is provided. */
-
 	*res = SearchSysCacheExists4(AMPROCNUM,
 								 ObjectIdGetDatum(opfamily),
 								 ObjectIdGetDatum(opcintype),
 								 ObjectIdGetDatum(opcintype),
 								 Int16GetDatum(procno));
+	*isnull = false;
+
 	return true;
 }
 
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 1b04c09..1d479fe 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1050,6 +1050,32 @@ get_opclass_input_type(Oid opclass)
 	return result;
 }
 
+/*
+ * get_opclass_family_and_input_type
+ *
+ *		Returns the OID of the operator family the opclass belongs to,
+ *				the OID of the datatype the opclass indexes
+ */
+bool
+get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
+{
+	HeapTuple	tp;
+	Form_pg_opclass cla_tup;
+
+	tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+	if (!HeapTupleIsValid(tp))
+		return false;
+
+	cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+	*opfamily = cla_tup->opcfamily;
+	*opcintype = cla_tup->opcintype;
+
+	ReleaseSysCache(tp);
+
+	return true;
+}
+
 /*				---------- OPERATOR CACHE ----------					 */
 
 /*
@@ -3061,3 +3087,45 @@ get_range_subtype(Oid rangeOid)
 	else
 		return InvalidOid;
 }
+
+/*				---------- PG_INDEX CACHE ----------				 */
+
+/*
+ * get_index_column_opclass
+ *
+ *		Given the index OID and column number,
+ *		return opclass of the index column
+ *			or InvalidOid if the index was not found.
+ */
+Oid
+get_index_column_opclass(Oid index_oid, int attno)
+{
+	HeapTuple	tuple;
+	Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
+	Datum		datum;
+	bool		isnull;
+	oidvector  *indclass;
+	Oid			opclass;
+
+	/* First we need to know the column's opclass. */
+
+	tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
+	if (!HeapTupleIsValid(tuple))
+		return InvalidOid;
+
+	rd_index = (Form_pg_index) GETSTRUCT(tuple);
+
+	/* caller is supposed to guarantee this */
+	Assert(attno > 0 && attno <= rd_index->indnatts);
+
+	datum = SysCacheGetAttr(INDEXRELID, tuple,
+							Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+
+	indclass = ((oidvector *) DatumGetPointer(datum));
+	opclass = indclass->values[attno - 1];
+
+	ReleaseSysCache(tuple);
+
+	return opclass;
+}
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index b6d1fca..618c4e8 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -73,6 +73,8 @@ extern char *get_constraint_name(Oid conoid);
 extern char *get_language_name(Oid langoid, bool missing_ok);
 extern Oid	get_opclass_family(Oid opclass);
 extern Oid	get_opclass_input_type(Oid opclass);
+extern bool get_opclass_opfamily_and_input_type(Oid opclass,
+									Oid *opfamily, Oid *opcintype);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid	get_op_rettype(Oid opno);
@@ -159,6 +161,7 @@ extern void free_attstatsslot(Oid atttype,
 extern char *get_namespace_name(Oid nspid);
 extern char *get_namespace_name_or_temp(Oid nspid);
 extern Oid	get_range_subtype(Oid rangeOid);
+extern Oid	get_index_column_opclass(Oid index_oid, int attno);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/src/backend/utils/adt/geo_ops.c b/src/backend/utils/adt/geo_ops.c
index 655b81c..e1814ab 100644
--- a/src/backend/utils/adt/geo_ops.c
+++ b/src/backend/utils/adt/geo_ops.c
@@ -41,8 +41,6 @@ enum path_delim
 static int	point_inside(Point *p, int npts, Point *plist);
 static int	lseg_crossing(double x, double y, double px, double py);
 static BOX *box_construct(double x1, double x2, double y1, double y2);
-static BOX *box_copy(BOX *box);
-static BOX *box_fill(BOX *result, double x1, double x2, double y1, double y2);
 static bool box_ov(BOX *box1, BOX *box2);
 static double box_ht(BOX *box);
 static double box_wd(BOX *box);
@@ -452,7 +450,7 @@ box_construct(double x1, double x2, double y1, double y2)
 
 /*		box_fill		-		fill in a given box struct
  */
-static BOX *
+BOX *
 box_fill(BOX *result, double x1, double x2, double y1, double y2)
 {
 	if (x1 > x2)
@@ -482,8 +480,8 @@ box_fill(BOX *result, double x1, double x2, double y1, double y2)
 
 /*		box_copy		-		copy a box
  */
-static BOX *
-box_copy(BOX *box)
+BOX *
+box_copy(const BOX *box)
 {
 	BOX		   *result = (BOX *) palloc(sizeof(BOX));
 
diff --git a/src/include/utils/geo_decls.h b/src/include/utils/geo_decls.h
index 9b530db..a514af2 100644
--- a/src/include/utils/geo_decls.h
+++ b/src/include/utils/geo_decls.h
@@ -183,4 +183,8 @@ extern double point_dt(Point *pt1, Point *pt2);
 extern double point_sl(Point *pt1, Point *pt2);
 extern double pg_hypot(double x, double y);
 
+/* private box routines */
+extern BOX *box_fill(BOX *box, double xlo, double xhi, double ylo, double yhi);
+extern BOX *box_copy(const BOX *box);
+
 #endif   /* GEO_DECLS_H */
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index eea366b..d1ba2b6 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -14,9 +14,9 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
-#include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "lib/pairingheap.h"
@@ -538,7 +538,6 @@ getNextNearest(IndexScanDesc scan)
 {
 	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
 	bool		res = false;
-	int			i;
 
 	if (scan->xs_itup)
 	{
@@ -559,45 +558,10 @@ getNextNearest(IndexScanDesc scan)
 			/* found a heap item at currently minimal distance */
 			scan->xs_ctup.t_self = item->data.heap.heapPtr;
 			scan->xs_recheck = item->data.heap.recheck;
-			scan->xs_recheckorderby = item->data.heap.recheckDistances;
-			for (i = 0; i < scan->numberOfOrderBys; i++)
-			{
-				if (so->orderByTypes[i] == FLOAT8OID)
-				{
-#ifndef USE_FLOAT8_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float8GetDatum(item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else if (so->orderByTypes[i] == FLOAT4OID)
-				{
-					/* convert distance function's result to ORDER BY type */
-#ifndef USE_FLOAT4_BYVAL
-					/* must free any old value to avoid memory leakage */
-					if (!scan->xs_orderbynulls[i])
-						pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-					scan->xs_orderbyvals[i] = Float4GetDatum((float4) item->distances[i]);
-					scan->xs_orderbynulls[i] = false;
-				}
-				else
-				{
-					/*
-					 * If the ordering operator's return value is anything
-					 * else, we don't know how to convert the float8 bound
-					 * calculated by the distance function to that.  The
-					 * executor won't actually need the order by values we
-					 * return here, if there are no lossy results, so only
-					 * insist on converting if the *recheck flag is set.
-					 */
-					if (scan->xs_recheckorderby)
-						elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
-					scan->xs_orderbynulls[i] = true;
-				}
-			}
+
+			index_store_orderby_distances(scan, so->orderByTypes,
+										  item->distances,
+										  item->data.heap.recheckDistances);
 
 			/* in an index-only scan, also return the reconstructed tuple. */
 			if (scan->xs_want_itup)
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index ba27c1e..8dd06c1 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -75,6 +75,7 @@
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/index.h"
+#include "catalog/pg_type.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -896,3 +897,72 @@ index_getprocinfo(Relation irel,
 
 	return locinfo;
 }
+
+/* ----------------
+ *		index_store_orderby_distances
+ *
+ *		Convert AM distance function's results (that can be inexact)
+ *		to ORDER BY types and save them into xs_orderbyvals/xs_orderbynulls
+ *		for a possible recheck.
+ * ----------------
+ */
+void
+index_store_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+							  double *distances, bool recheckOrderBy)
+{
+	int			i;
+
+	scan->xs_recheckorderby = recheckOrderBy;
+
+	if (!distances)
+	{
+		Assert(!scan->xs_recheckorderby);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			scan->xs_orderbyvals[i] = (Datum) 0;
+			scan->xs_orderbynulls[i] = true;
+		}
+
+		return;
+	}
+
+	for (i = 0; i < scan->numberOfOrderBys; i++)
+	{
+		if (orderByTypes[i] == FLOAT8OID)
+		{
+#ifndef USE_FLOAT8_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float8GetDatum(distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else if (orderByTypes[i] == FLOAT4OID)
+		{
+			/* convert distance function's result to ORDER BY type */
+#ifndef USE_FLOAT4_BYVAL
+			/* must free any old value to avoid memory leakage */
+			if (!scan->xs_orderbynulls[i])
+				pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+			scan->xs_orderbyvals[i] = Float4GetDatum((float4) distances[i]);
+			scan->xs_orderbynulls[i] = false;
+		}
+		else
+		{
+			/*
+			 * If the ordering operator's return value is anything
+			 * else, we don't know how to convert the float8 bound
+			 * calculated by the distance function to that.  The
+			 * executor won't actually need the order by values we
+			 * return here, if there are no lossy results, so only
+			 * insist on converting if the *recheck flag is set.
+			 */
+			if (scan->xs_recheckorderby)
+				elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
+			scan->xs_orderbynulls[i] = true;
+		}
+	}
+}
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 51466b9..fe0f759 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -170,6 +170,8 @@ extern RegProcedure index_getprocid(Relation irel, AttrNumber attnum,
 				uint16 procnum);
 extern FmgrInfo *index_getprocinfo(Relation irel, AttrNumber attnum,
 				  uint16 procnum);
+extern void index_store_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+										double *distances, bool recheckOrderBy);
 
 /*
  * index access method support routines (in genam.c)
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index cd4a8d0..53ca8bf 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -584,7 +584,9 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ...
 typedef struct spgInnerConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
+    ScanKey		orderbyKeys;	/* array of ordering operators and comparison values */
     int         nkeys;          /* length of array */
+    int         norderbys;      /* length of array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -607,6 +609,7 @@ typedef struct spgInnerConsistentOut
     int        *levelAdds;      /* increment level by this much for each */
     Datum      *reconstructedValues;    /* associated reconstructed values */
     void      **traversalValues;        /* opclass-specific traverse values */
+    double    **distances;              /* associated distances */
 } spgInnerConsistentOut;
 </programlisting>
 
@@ -621,6 +624,8 @@ typedef struct spgInnerConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       <structfield>orderbyKeys</>, of length <structfield>norderbys</>,
+       describes ordering operators (if any) in the same fashion.
        <structfield>reconstructedValue</> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</> at the root level or if the
        <function>inner_consistent</> function did not provide a value at the
@@ -661,6 +666,11 @@ typedef struct spgInnerConsistentOut
        <structfield>reconstructedValues</> to an array of the values
        reconstructed for each child node to be visited; otherwise, leave
        <structfield>reconstructedValues</> as NULL.
+       <structfield>distances</> if the ordered search is carried out,
+       the implementation is supposed to fill them in accordance to the
+       ordering operators provided in <structfield>orderbyKeys</>
+       (nodes with lowest distances will be processed first). Leave it
+       NULL otherwise.
        If it is desired to pass down additional out-of-band information
        (<quote>traverse values</>) to lower levels of the tree search,
        set <structfield>traversalValues</> to an array of the appropriate
@@ -669,7 +679,7 @@ typedef struct spgInnerConsistentOut
        Note that the <function>inner_consistent</> function is
        responsible for palloc'ing the
        <structfield>nodeNumbers</>, <structfield>levelAdds</>,
-       <structfield>reconstructedValues</>, and
+       <structfield>distances</>, <structfield>reconstructedValues</> and
        <structfield>traversalValues</> arrays in the current memory context.
        However, any output traverse values pointed to by
        the <structfield>traversalValues</> array should be allocated
@@ -699,7 +709,9 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ...
 typedef struct spgLeafConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
-    int         nkeys;          /* length of array */
+    ScanKey     orderbykeys;    /* array of ordering operators and comparison values */
+    int         nkeys;          /* length of scankeys array */
+    int         norderbys;      /* length of orderbykeys array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -711,8 +723,10 @@ typedef struct spgLeafConsistentIn
 
 typedef struct spgLeafConsistentOut
 {
-    Datum       leafValue;      /* reconstructed original data, if any */
-    bool        recheck;        /* set true if operator must be rechecked */
+    Datum       leafValue;        /* reconstructed original data, if any */
+    bool        recheck;          /* set true if operator must be rechecked */
+    bool        recheckDistances; /* set true if distances must be rechecked */
+    double     *distances;        /* associated distances */
 } spgLeafConsistentOut;
 </programlisting>
 
@@ -727,6 +741,8 @@ typedef struct spgLeafConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       The array <structfield>orderbykeys>, of length <structfield>norderbys</>,
+       describes the ordering operators in the same fashion.
        <structfield>reconstructedValue</> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</> at the root level or if the
        <function>inner_consistent</> function did not provide a value at the
@@ -752,6 +768,13 @@ typedef struct spgLeafConsistentOut
        <structfield>recheck</> may be set to <literal>true</> if the match
        is uncertain and so the operator(s) must be re-applied to the actual
        heap tuple to verify the match.
+       In case when the ordered search is performed, <structfield>distances</>
+       should be set in accordance with the ordering operator provided, otherwise
+       implementation is supposed to set it to NULL.
+       If at least one of returned distances is not exact, the function must set
+       <structfield>recheckDistances</> to true.  In this case the executor will
+       calculate the accurate distance after fetching the tuple from the heap,
+       and reorder the tuples if necessary.       
       </para>
      </listitem>
     </varlistentry>
diff --git a/src/backend/access/spgist/Makefile b/src/backend/access/spgist/Makefile
index 14948a5..5be3df5 100644
--- a/src/backend/access/spgist/Makefile
+++ b/src/backend/access/spgist/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \
 	spgdoinsert.o spgxlog.o \
-	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o
+	spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o \
+	spgproc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/spgist/README b/src/backend/access/spgist/README
index 09ab21a..1420a18 100644
--- a/src/backend/access/spgist/README
+++ b/src/backend/access/spgist/README
@@ -41,7 +41,12 @@ contain exactly one inner tuple.
 
 When the search traversal algorithm reaches an inner tuple, it chooses a set
 of nodes to continue tree traverse in depth.  If it reaches a leaf page it
-scans a list of leaf tuples to find the ones that match the query.
+scans a list of leaf tuples to find the ones that match the query. SP-GiSTs
+also support ordered searches - that is during the scan is performed,
+implementation can choose to map floating-point distances to inner and leaf
+tuples and the traversal will be performed by the closest-first model. It
+can be usefull e.g. for performing nearest-neighbour searches on the dataset.
+
 
 The insertion algorithm descends the tree similarly, except it must choose
 just one node to descend to from each inner tuple.  Insertion might also have
@@ -371,3 +376,4 @@ AUTHORS
 
     Teodor Sigaev <teo...@sigaev.ru>
     Oleg Bartunov <o...@sai.msu.su>
+    Vlad Sterzhanov <glide...@gmail.com>
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 139d998..f585b62 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -15,79 +15,118 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/relscan.h"
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "utils/builtins.h"
 #include "utils/datum.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
 
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
-								 Datum leafValue, bool isnull, bool recheck);
+							   Datum leafValue, bool isnull, bool recheck,
+							   bool recheckDist, double *distances);
 
-typedef struct ScanStackEntry
+/*
+ * Pairing heap comparison function for the SpGistSearchItem queue
+ */
+static int
+pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
+								 const pairingheap_node *b, void *arg)
 {
-	Datum		reconstructedValue;		/* value reconstructed from parent */
-	void	   *traversalValue; /* opclass-specific traverse value */
-	int			level;			/* level of items on this page */
-	ItemPointerData ptr;		/* block and offset to scan from */
-} ScanStackEntry;
+	const SpGistSearchItem *sa = (const SpGistSearchItem *) a;
+	const SpGistSearchItem *sb = (const SpGistSearchItem *) b;
+	IndexScanDesc scan = (IndexScanDesc) arg;
+	int			i;
+
+	/* Order according to distance comparison */
+	for (i = 0; i < scan->numberOfOrderBys; i++)
+	{
+		if (sa->distances[i] != sb->distances[i])
+			return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+	}
 
+	/* Leaf items go before inner pages, to ensure a depth-first search */
+	if (sa->isLeaf && !sb->isLeaf)
+		return 1;
+	if (!sa->isLeaf && sb->isLeaf)
+		return -1;
+
+	return 0;
+}
 
-/* Free a ScanStackEntry */
 static void
-freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
+spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
 {
 	if (!so->state.attType.attbyval &&
-		DatumGetPointer(stackEntry->reconstructedValue) != NULL)
-		pfree(DatumGetPointer(stackEntry->reconstructedValue));
-	if (stackEntry->traversalValue)
-		pfree(stackEntry->traversalValue);
+		DatumGetPointer(item->value) != NULL)
+		pfree(DatumGetPointer(item->value));
+
+	if (item->traversalValue)
+		pfree(item->traversalValue);
 
-	pfree(stackEntry);
+	pfree(item);
+}
+
+/*
+ * Add SpGistSearchItem to queue
+ *
+ * Called in queue context
+ */
+static void
+spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item,
+						double *distances)
+{
+	memcpy(item->distances, distances, so->numberOfOrderBys * sizeof(double));
+	pairingheap_add(so->queue, &item->phNode);
 }
 
-/* Free the entire stack */
 static void
-freeScanStack(SpGistScanOpaque so)
+spgAddStartItem(SpGistScanOpaque so, bool isnull)
 {
-	ListCell   *lc;
+	SpGistSearchItem *startEntry = (SpGistSearchItem *) palloc0(
+								SizeOfSpGistSearchItem(so->numberOfOrderBys));
 
-	foreach(lc, so->scanStack)
-	{
-		freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
-	}
-	list_free(so->scanStack);
-	so->scanStack = NIL;
+	ItemPointerSet(&startEntry->heap,
+				   isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
+				   FirstOffsetNumber);
+	startEntry->isLeaf = false;
+	startEntry->level = 0;
+	startEntry->isnull = isnull;
+
+	spgAddSearchItemToQueue(so, startEntry, so->zeroDistances);
 }
 
 /*
- * Initialize scanStack to search the root page, resetting
+ * Initialize queue to search the root page, resetting
  * any previously active scan
  */
 static void
 resetSpGistScanOpaque(SpGistScanOpaque so)
 {
-	ScanStackEntry *startEntry;
-
-	freeScanStack(so);
+	MemoryContext oldCtx = MemoryContextSwitchTo(so->queueCxt);
 
 	if (so->searchNulls)
-	{
-		/* Stack a work item to scan the null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
-	}
+		/* Add a work item to scan the null index entries */
+		spgAddStartItem(so, true);
 
 	if (so->searchNonNulls)
+		/* Add a work item to scan the non-null index entries */
+		spgAddStartItem(so, false);
+
+	MemoryContextSwitchTo(oldCtx);
+
+	if (so->numberOfOrderBys > 0)
 	{
-		/* Stack a work item to scan the non-null index entries */
-		startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-		ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
-		so->scanStack = lappend(so->scanStack, startEntry);
+		/* Must pfree distances to avoid memory leak */
+		int			i;
+
+		for (i = 0; i < so->nPtrs; i++)
+			pfree(so->distances[i]);
 	}
 
 	if (so->want_itup)
@@ -122,6 +161,9 @@ spgPrepareScanKeys(IndexScanDesc scan)
 	int			nkeys;
 	int			i;
 
+	so->numberOfOrderBys = scan->numberOfOrderBys;
+	so->orderByData = scan->orderByData;
+
 	if (scan->numberOfKeys <= 0)
 	{
 		/* If no quals, whole-index scan is required */
@@ -182,8 +224,9 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
 	IndexScanDesc scan;
 	SpGistScanOpaque so;
+	int			i;
 
-	scan = RelationGetIndexScan(rel, keysz, 0);
+	scan = RelationGetIndexScan(rel, keysz, orderbysz);
 
 	so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
 	if (keysz > 0)
@@ -191,6 +234,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	else
 		so->keyData = NULL;
 	initSpGistState(&so->state, scan->indexRelation);
+
 	so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
 										"SP-GiST search temporary context",
 										ALLOCSET_DEFAULT_SIZES);
@@ -198,6 +242,36 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 	/* Set up indexTupDesc and xs_itupdesc in case it's an index-only scan */
 	so->indexTupDesc = scan->xs_itupdesc = RelationGetDescr(rel);
 
+	if (scan->numberOfOrderBys > 0)
+	{
+		so->zeroDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+		so->infDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			so->zeroDistances[i] = 0.0;
+			so->infDistances[i] = get_float8_infinity();
+		}
+
+		scan->xs_orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys);
+		scan->xs_orderbynulls = palloc(sizeof(bool) * scan->numberOfOrderBys);
+		memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys);
+	}
+
+	so->queueCxt = AllocSetContextCreate(CurrentMemoryContext,
+										 "SP-GiST queue context",
+										 ALLOCSET_DEFAULT_SIZES);
+
+	fmgr_info_copy(&so->innerConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	fmgr_info_copy(&so->leafConsistentFn,
+				   index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
+				   CurrentMemoryContext);
+
+	so->indexCollation = rel->rd_indcollation[0];
+
 	scan->opaque = so;
 
 	return scan;
@@ -208,18 +282,51 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 		  ScanKey orderbys, int norderbys)
 {
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
+	MemoryContext oldCxt;
 
 	/* copy scankeys into local storage */
 	if (scankey && scan->numberOfKeys > 0)
-	{
 		memmove(scan->keyData, scankey,
 				scan->numberOfKeys * sizeof(ScanKeyData));
+
+	if (orderbys && scan->numberOfOrderBys > 0)
+	{
+		int			i;
+
+		memmove(scan->orderByData, orderbys,
+				scan->numberOfOrderBys * sizeof(ScanKeyData));
+
+		so->orderByTypes = (Oid *) palloc(sizeof(Oid) * scan->numberOfOrderBys);
+
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			ScanKey		skey = &scan->orderByData[i];
+
+			/*
+			 * Look up the datatype returned by the original ordering
+			 * operator. SP-GiST always uses a float8 for the distance function,
+			 * but the ordering operator could be anything else.
+			 *
+			 * XXX: The distance function is only allowed to be lossy if the
+			 * ordering operator's result type is float4 or float8.  Otherwise
+			 * we don't know how to return the distance to the executor.  But
+			 * we cannot check that here, as we won't know if the distance
+			 * function is lossy until it returns *recheck = true for the
+			 * first time.
+			 */
+			so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
+		}
 	}
 
 	/* preprocess scankeys, set up the representation in *so */
 	spgPrepareScanKeys(scan);
 
-	/* set up starting stack entries */
+	MemoryContextReset(so->queueCxt);
+	oldCxt = MemoryContextSwitchTo(so->queueCxt);
+	so->queue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, scan);
+	MemoryContextSwitchTo(oldCxt);
+
+	/* set up starting queue entries */
 	resetSpGistScanOpaque(so);
 }
 
@@ -229,65 +336,336 @@ spgendscan(IndexScanDesc scan)
 	SpGistScanOpaque so = (SpGistScanOpaque) scan->opaque;
 
 	MemoryContextDelete(so->tempCxt);
+	MemoryContextDelete(so->queueCxt);
+
+	if (scan->numberOfOrderBys > 0)
+	{
+		pfree(so->zeroDistances);
+		pfree(so->infDistances);
+	}
+}
+
+/*
+ * Leaf SpGistSearchItem constructor, called in queue context
+ */
+static SpGistSearchItem *
+spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointerData heapPtr,
+			   Datum leafValue, bool recheckQual, bool recheckDist, bool isnull)
+{
+	SpGistSearchItem *item = (SpGistSearchItem *) palloc(
+								SizeOfSpGistSearchItem(so->numberOfOrderBys));
+
+	item->level = level;
+	item->heap = heapPtr;
+	/* copy value to queue cxt out of tmp cxt */
+	item->value = isnull ? (Datum) 0 :
+		datumCopy(leafValue, so->state.attType.attbyval,
+				  so->state.attType.attlen);
+	item->traversalValue = NULL;
+	item->isLeaf = true;
+	item->recheckQual = recheckQual;
+	item->recheckDist = recheckDist;
+	item->isnull = isnull;
+
+	return item;
 }
 
 /*
  * Test whether a leaf tuple satisfies all the scan keys
  *
- * *leafValue is set to the reconstructed datum, if provided
- * *recheck is set true if any of the operators are lossy
+ * *reportedSome is set to true if:
+ *		the scan is not ordered AND the item satisfies the scankeys
  */
 static bool
-spgLeafTest(Relation index, SpGistScanOpaque so,
+spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
 			SpGistLeafTuple leafTuple, bool isnull,
-			int level, Datum reconstructedValue,
-			void *traversalValue,
-			Datum *leafValue, bool *recheck)
+			bool *reportedSome, storeRes_func storeRes)
 {
+	Datum		leafValue;
+	double	   *distances;
 	bool		result;
-	Datum		leafDatum;
-	spgLeafConsistentIn in;
-	spgLeafConsistentOut out;
-	FmgrInfo   *procinfo;
-	MemoryContext oldCtx;
+	bool		recheckQual;
+	bool		recheckDist;
 
 	if (isnull)
 	{
 		/* Should not have arrived on a nulls page unless nulls are wanted */
 		Assert(so->searchNulls);
-		*leafValue = (Datum) 0;
-		*recheck = false;
-		return true;
+		leafValue = (Datum) 0;
+		/* Assume that all distances for null entries are infinities */
+		distances = so->infDistances;
+		recheckQual = false;
+		recheckDist = false;
+		result = true;
+	}
+	else
+	{
+		spgLeafConsistentIn in;
+		spgLeafConsistentOut out;
+
+		/* use temp context for calling leaf_consistent */
+		MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
+
+		in.scankeys = so->keyData;
+		in.nkeys = so->numberOfKeys;
+		in.orderbykeys = so->orderByData;
+		in.norderbys = so->numberOfOrderBys;
+		in.reconstructedValue = item->value;
+		in.traversalValue = item->traversalValue;
+		in.level = item->level;
+		in.returnData = so->want_itup;
+		in.leafDatum = SGLTDATUM(leafTuple, &so->state);
+
+		out.leafValue = (Datum) 0;
+		out.recheck = false;
+		out.distances = NULL;
+		out.recheckDistances = false;
+
+		result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
+												so->indexCollation,
+												PointerGetDatum(&in),
+												PointerGetDatum(&out)));
+		recheckQual = out.recheck;
+		recheckDist = out.recheckDistances;
+		leafValue = out.leafValue;
+		distances = out.distances;
+
+		MemoryContextSwitchTo(oldCxt);
 	}
 
-	leafDatum = SGLTDATUM(leafTuple, &so->state);
+	if (result)
+	{
+		/* item passes the scankeys */
+		if (so->numberOfOrderBys > 0)
+		{
+			/* the scan is ordered -> add the item to the queue */
+			MemoryContext oldCxt = MemoryContextSwitchTo(so->queueCxt);
+			SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
+														leafTuple->heapPtr,
+														leafValue,
+														recheckQual,
+														recheckDist,
+														isnull);
+
+			spgAddSearchItemToQueue(so, heapItem, distances);
+
+			MemoryContextSwitchTo(oldCxt);
+		}
+		else
+		{
+			/* non-ordered scan, so report the item right away */
+			Assert(!recheckDist);
+			storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
+					 recheckQual, false, NULL);
+			*reportedSome = true;
+		}
+	}
 
-	/* use temp context for calling leaf_consistent */
-	oldCtx = MemoryContextSwitchTo(so->tempCxt);
+	return result;
+}
 
-	in.scankeys = so->keyData;
-	in.nkeys = so->numberOfKeys;
-	in.reconstructedValue = reconstructedValue;
-	in.traversalValue = traversalValue;
-	in.level = level;
-	in.returnData = so->want_itup;
-	in.leafDatum = leafDatum;
+/* A bundle initializer for inner_consistent methods */
+static void
+spgInitInnerConsistentIn(spgInnerConsistentIn *in,
+						 SpGistScanOpaque so,
+						 SpGistSearchItem *item,
+						 SpGistInnerTuple innerTuple,
+						 MemoryContext traversalMemoryContext)
+{
+	in->scankeys = so->keyData;
+	in->nkeys = so->numberOfKeys;
+	in->norderbys = so->numberOfOrderBys;
+	in->orderbyKeys = so->orderByData;
+	in->reconstructedValue = item->value;
+	in->traversalMemoryContext = traversalMemoryContext;
+	in->traversalValue = item->traversalValue;
+	in->level = item->level;
+	in->returnData = so->want_itup;
+	in->allTheSame = innerTuple->allTheSame;
+	in->hasPrefix = (innerTuple->prefixSize > 0);
+	in->prefixDatum = SGITDATUM(innerTuple, &so->state);
+	in->nNodes = innerTuple->nNodes;
+	in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
+}
+
+static SpGistSearchItem *
+spgMakeInnerItem(SpGistScanOpaque so,
+				 SpGistSearchItem *parentItem,
+				 SpGistNodeTuple tuple,
+				 spgInnerConsistentOut *out, int i, bool isnull)
+{
+	SpGistSearchItem *item = palloc(SizeOfSpGistSearchItem(so->numberOfOrderBys));
+
+	item->heap = tuple->t_tid;
+	item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
+								 : parentItem->level;
+
+	/* Must copy value out of temp context */
+	item->value = out->reconstructedValues
+					? datumCopy(out->reconstructedValues[i],
+								so->state.attType.attbyval,
+								so->state.attType.attlen)
+					: (Datum) 0;
+
+	/*
+	 * Elements of out.traversalValues should be allocated in
+	 * in.traversalMemoryContext, which is actually a long
+	 * lived context of index scan.
+	 */
+	item->traversalValue =
+			out->traversalValues ? out->traversalValues[i] : NULL;
+
+	item->isLeaf = false;
+	item->recheckQual = false;
+	item->recheckDist = false;
+	item->isnull = isnull;
+
+	return item;
+}
 
-	out.leafValue = (Datum) 0;
-	out.recheck = false;
+static void
+spgInnerTest(SpGistScanOpaque so, SpGistSearchItem *item,
+			 SpGistInnerTuple innerTuple, bool isnull)
+{
+	MemoryContext			oldCxt = MemoryContextSwitchTo(so->tempCxt);
+	spgInnerConsistentOut	out;
+	int						nNodes = innerTuple->nNodes;
+	int						i;
 
-	procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
-	result = DatumGetBool(FunctionCall2Coll(procinfo,
-											index->rd_indcollation[0],
-											PointerGetDatum(&in),
-											PointerGetDatum(&out)));
+	memset(&out, 0, sizeof(out));
 
-	*leafValue = out.leafValue;
-	*recheck = out.recheck;
+	if (!isnull)
+	{
+		spgInnerConsistentIn in;
 
-	MemoryContextSwitchTo(oldCtx);
+		spgInitInnerConsistentIn(&in, so, item, innerTuple, oldCxt);
 
-	return result;
+		/* use user-defined inner consistent method */
+		FunctionCall2Coll(&so->innerConsistentFn,
+						  so->indexCollation,
+						  PointerGetDatum(&in),
+						  PointerGetDatum(&out));
+	}
+	else
+	{
+		/* force all children to be visited */
+		out.nNodes = nNodes;
+		out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
+		for (i = 0; i < nNodes; i++)
+			out.nodeNumbers[i] = i;
+	}
+
+	/* If allTheSame, they should all or none of 'em match */
+	if (innerTuple->allTheSame)
+		if (out.nNodes != 0 && out.nNodes != nNodes)
+			elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
+
+	if (out.nNodes)
+	{
+		/* collect node pointers */
+		SpGistNodeTuple		node;
+		SpGistNodeTuple	   *nodes = (SpGistNodeTuple *) palloc(
+									sizeof(SpGistNodeTuple) * nNodes);
+
+		SGITITERATE(innerTuple, i, node)
+		{
+			nodes[i] = node;
+		}
+
+		MemoryContextSwitchTo(so->queueCxt);
+
+		for (i = 0; i < out.nNodes; i++)
+		{
+			int			nodeN = out.nodeNumbers[i];
+			SpGistSearchItem *innerItem;
+
+			Assert(nodeN >= 0 && nodeN < nNodes);
+
+			node = nodes[nodeN];
+
+			if (!ItemPointerIsValid(&node->t_tid))
+				continue;
+
+			innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull);
+
+			/* Will copy out the distances in spgAddSearchItemToQueue anyway */
+			spgAddSearchItemToQueue(so, innerItem,
+									out.distances ? out.distances[i]
+												  : so->infDistances);
+		}
+	}
+
+	MemoryContextSwitchTo(oldCxt);
+}
+
+/* Returns a next item in an (ordered) scan or null if the index is exhausted */
+static SpGistSearchItem *
+spgGetNextQueueItem(SpGistScanOpaque so)
+{
+	SpGistSearchItem *item;
+
+	if (!pairingheap_is_empty(so->queue))
+		item = (SpGistSearchItem *) pairingheap_remove_first(so->queue);
+	else
+		/* Done when both heaps are empty */
+		item = NULL;
+
+	/* Return item; caller is responsible to pfree it */
+	return item;
+}
+
+enum SpGistSpecialOffsetNumbers
+{
+	SpGistBreakOffsetNumber = InvalidOffsetNumber,
+	SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
+	SpGistErrorOffsetNumber = MaxOffsetNumber + 2
+};
+
+static OffsetNumber
+spgTestLeafTuple(SpGistScanOpaque so,
+				 SpGistSearchItem *item,
+				 Page page, OffsetNumber offset,
+				 bool isnull, bool isroot,
+				 bool *reportedSome,
+				 storeRes_func storeRes)
+{
+	SpGistLeafTuple leafTuple = (SpGistLeafTuple)
+		PageGetItem(page, PageGetItemId(page, offset));
+
+	if (leafTuple->tupstate != SPGIST_LIVE)
+	{
+		if (!isroot) /* all tuples on root should be live */
+		{
+			if (leafTuple->tupstate == SPGIST_REDIRECT)
+			{
+				/* redirection tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heap));
+				/* transfer attention to redirect point */
+				item->heap = ((SpGistDeadTuple) leafTuple)->pointer;
+				Assert(ItemPointerGetBlockNumber(&item->heap) != SPGIST_METAPAGE_BLKNO);
+				return SpGistRedirectOffsetNumber;
+			}
+
+			if (leafTuple->tupstate == SPGIST_DEAD)
+			{
+				/* dead tuple should be first in chain */
+				Assert(offset == ItemPointerGetOffsetNumber(&item->heap));
+				/* No live entries on this page */
+				Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+				return SpGistBreakOffsetNumber;
+			}
+		}
+
+		/* We should not arrive at a placeholder */
+		elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
+		return SpGistErrorOffsetNumber;
+	}
+
+	Assert(ItemPointerIsValid(&leafTuple->heapPtr));
+
+	spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+
+	return leafTuple->nextOffset;
 }
 
 /*
@@ -306,247 +684,101 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
 
 	while (scanWholeIndex || !reportedSome)
 	{
-		ScanStackEntry *stackEntry;
-		BlockNumber blkno;
-		OffsetNumber offset;
-		Page		page;
-		bool		isnull;
+		SpGistSearchItem *item = spgGetNextQueueItem(so);
 
-		/* Pull next to-do item from the list */
-		if (so->scanStack == NIL)
-			break;				/* there are no more pages to scan */
-
-		stackEntry = (ScanStackEntry *) linitial(so->scanStack);
-		so->scanStack = list_delete_first(so->scanStack);
+		if (item == NULL)
+			break; /* No more items in queue -> done */
 
 redirect:
 		/* Check for interrupts, just in case of infinite loop */
 		CHECK_FOR_INTERRUPTS();
 
-		blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
-		offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
-
-		if (buffer == InvalidBuffer)
+		if (item->isLeaf)
 		{
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			/* We store heap items in the queue only in case of ordered search */
+			Assert(so->numberOfOrderBys > 0);
+			storeRes(so, &item->heap, item->value, item->isnull,
+					 item->recheckQual, item->recheckDist, item->distances);
+			reportedSome = true;
 		}
-		else if (blkno != BufferGetBlockNumber(buffer))
+		else
 		{
-			UnlockReleaseBuffer(buffer);
-			buffer = ReadBuffer(index, blkno);
-			LockBuffer(buffer, BUFFER_LOCK_SHARE);
-		}
-		/* else new pointer points to the same page, no work needed */
+			BlockNumber		blkno  = ItemPointerGetBlockNumber(&item->heap);
+			OffsetNumber	offset = ItemPointerGetOffsetNumber(&item->heap);
+			Page			page;
+			bool			isnull;
 
-		page = BufferGetPage(buffer);
-		TestForOldSnapshot(snapshot, index, page);
+			if (buffer == InvalidBuffer)
+			{
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
+			else if (blkno != BufferGetBlockNumber(buffer))
+			{
+				UnlockReleaseBuffer(buffer);
+				buffer = ReadBuffer(index, blkno);
+				LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			}
 
-		isnull = SpGistPageStoresNulls(page) ? true : false;
+			/* else new pointer points to the same page, no work needed */
 
-		if (SpGistPageIsLeaf(page))
-		{
-			SpGistLeafTuple leafTuple;
-			OffsetNumber max = PageGetMaxOffsetNumber(page);
-			Datum		leafValue = (Datum) 0;
-			bool		recheck = false;
+			page = BufferGetPage(buffer);
+			TestForOldSnapshot(snapshot, index, page);
 
-			if (SpGistBlockIsRoot(blkno))
+			isnull = SpGistPageStoresNulls(page) ? true : false;
+
+			if (SpGistPageIsLeaf(page))
 			{
-				/* When root is a leaf, examine all its tuples */
-				for (offset = FirstOffsetNumber; offset <= max; offset++)
-				{
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
-					{
-						/* all tuples on root should be live */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
-					}
+				/* Page is a leaf - that is, all it's tuples are heap items */
+				OffsetNumber max = PageGetMaxOffsetNumber(page);
 
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
-					}
+				if (SpGistBlockIsRoot(blkno))
+				{
+					/* When root is a leaf, examine all its tuples */
+					for (offset = FirstOffsetNumber; offset <= max; offset++)
+						(void) spgTestLeafTuple(so, item, page, offset,
+												isnull, true,
+												&reportedSome, storeRes);
 				}
-			}
-			else
-			{
-				/* Normal case: just examine the chain we arrived at */
-				while (offset != InvalidOffsetNumber)
+				else
 				{
-					Assert(offset >= FirstOffsetNumber && offset <= max);
-					leafTuple = (SpGistLeafTuple)
-						PageGetItem(page, PageGetItemId(page, offset));
-					if (leafTuple->tupstate != SPGIST_LIVE)
+					/* Normal case: just examine the chain we arrived at */
+					while (offset != InvalidOffsetNumber)
 					{
-						if (leafTuple->tupstate == SPGIST_REDIRECT)
-						{
-							/* redirection tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* transfer attention to redirect point */
-							stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
-							Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
+						Assert(offset >= FirstOffsetNumber && offset <= max);
+						offset = spgTestLeafTuple(so, item, page, offset,
+												  isnull, false,
+												  &reportedSome, storeRes);
+						if (offset == SpGistRedirectOffsetNumber)
 							goto redirect;
-						}
-						if (leafTuple->tupstate == SPGIST_DEAD)
-						{
-							/* dead tuple should be first in chain */
-							Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-							/* No live entries on this page */
-							Assert(leafTuple->nextOffset == InvalidOffsetNumber);
-							break;
-						}
-						/* We should not arrive at a placeholder */
-						elog(ERROR, "unexpected SPGiST tuple state: %d",
-							 leafTuple->tupstate);
-					}
-
-					Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-					if (spgLeafTest(index, so,
-									leafTuple, isnull,
-									stackEntry->level,
-									stackEntry->reconstructedValue,
-									stackEntry->traversalValue,
-									&leafValue,
-									&recheck))
-					{
-						storeRes(so, &leafTuple->heapPtr,
-								 leafValue, isnull, recheck);
-						reportedSome = true;
 					}
-
-					offset = leafTuple->nextOffset;
-				}
-			}
-		}
-		else	/* page is inner */
-		{
-			SpGistInnerTuple innerTuple;
-			spgInnerConsistentIn in;
-			spgInnerConsistentOut out;
-			FmgrInfo   *procinfo;
-			SpGistNodeTuple *nodes;
-			SpGistNodeTuple node;
-			int			i;
-			MemoryContext oldCtx;
-
-			innerTuple = (SpGistInnerTuple) PageGetItem(page,
-												PageGetItemId(page, offset));
-
-			if (innerTuple->tupstate != SPGIST_LIVE)
-			{
-				if (innerTuple->tupstate == SPGIST_REDIRECT)
-				{
-					/* transfer attention to redirect point */
-					stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
-					Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
-					goto redirect;
 				}
-				elog(ERROR, "unexpected SPGiST tuple state: %d",
-					 innerTuple->tupstate);
 			}
-
-			/* use temp context for calling inner_consistent */
-			oldCtx = MemoryContextSwitchTo(so->tempCxt);
-
-			in.scankeys = so->keyData;
-			in.nkeys = so->numberOfKeys;
-			in.reconstructedValue = stackEntry->reconstructedValue;
-			in.traversalMemoryContext = oldCtx;
-			in.traversalValue = stackEntry->traversalValue;
-			in.level = stackEntry->level;
-			in.returnData = so->want_itup;
-			in.allTheSame = innerTuple->allTheSame;
-			in.hasPrefix = (innerTuple->prefixSize > 0);
-			in.prefixDatum = SGITDATUM(innerTuple, &so->state);
-			in.nNodes = innerTuple->nNodes;
-			in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
-
-			/* collect node pointers */
-			nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
-			SGITITERATE(innerTuple, i, node)
-			{
-				nodes[i] = node;
-			}
-
-			memset(&out, 0, sizeof(out));
-
-			if (!isnull)
+			else	/* page is inner */
 			{
-				/* use user-defined inner consistent method */
-				procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
-				FunctionCall2Coll(procinfo,
-								  index->rd_indcollation[0],
-								  PointerGetDatum(&in),
-								  PointerGetDatum(&out));
-			}
-			else
-			{
-				/* force all children to be visited */
-				out.nNodes = in.nNodes;
-				out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
-				for (i = 0; i < in.nNodes; i++)
-					out.nodeNumbers[i] = i;
-			}
-
-			MemoryContextSwitchTo(oldCtx);
-
-			/* If allTheSame, they should all or none of 'em match */
-			if (innerTuple->allTheSame)
-				if (out.nNodes != 0 && out.nNodes != in.nNodes)
-					elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
-
-			for (i = 0; i < out.nNodes; i++)
-			{
-				int			nodeN = out.nodeNumbers[i];
+				SpGistInnerTuple innerTuple = (SpGistInnerTuple)
+						PageGetItem(page, PageGetItemId(page, offset));
 
-				Assert(nodeN >= 0 && nodeN < in.nNodes);
-				if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
+				if (innerTuple->tupstate != SPGIST_LIVE)
 				{
-					ScanStackEntry *newEntry;
-
-					/* Create new work item for this node */
-					newEntry = palloc(sizeof(ScanStackEntry));
-					newEntry->ptr = nodes[nodeN]->t_tid;
-					if (out.levelAdds)
-						newEntry->level = stackEntry->level + out.levelAdds[i];
-					else
-						newEntry->level = stackEntry->level;
-					/* Must copy value out of temp context */
-					if (out.reconstructedValues)
-						newEntry->reconstructedValue =
-							datumCopy(out.reconstructedValues[i],
-									  so->state.attType.attbyval,
-									  so->state.attType.attlen);
-					else
-						newEntry->reconstructedValue = (Datum) 0;
-
-					/*
-					 * Elements of out.traversalValues should be allocated in
-					 * in.traversalMemoryContext, which is actually a long
-					 * lived context of index scan.
-					 */
-					newEntry->traversalValue = (out.traversalValues) ?
-						out.traversalValues[i] : NULL;
-
-					so->scanStack = lcons(newEntry, so->scanStack);
+					if (innerTuple->tupstate == SPGIST_REDIRECT)
+					{
+						/* transfer attention to redirect point */
+						item->heap = ((SpGistDeadTuple) innerTuple)->pointer;
+						Assert(ItemPointerGetBlockNumber(&item->heap) !=
+							   SPGIST_METAPAGE_BLKNO);
+						goto redirect;
+					}
+					elog(ERROR, "unexpected SPGiST tuple state: %d",
+						 innerTuple->tupstate);
 				}
+
+				spgInnerTest(so, item, innerTuple, isnull);
 			}
 		}
 
-		/* done with this scan stack entry */
-		freeScanStackEntry(so, stackEntry);
+		/* done with this scan item */
+		spgFreeSearchItem(so, item);
 		/* clear temp context before proceeding to the next one */
 		MemoryContextReset(so->tempCxt);
 	}
@@ -555,11 +787,14 @@ redirect:
 		UnlockReleaseBuffer(buffer);
 }
 
+
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-			Datum leafValue, bool isnull, bool recheck)
+			Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			double *distances)
 {
+	Assert(!recheckDist && !distances);
 	tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
 	so->ntids++;
 }
@@ -583,11 +818,21 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-			  Datum leafValue, bool isnull, bool recheck)
+			  Datum leafValue, bool isnull, bool recheck, bool recheckDist,
+			  double *distances)
 {
 	Assert(so->nPtrs < MaxIndexTuplesPerPage);
 	so->heapPtrs[so->nPtrs] = *heapPtr;
 	so->recheck[so->nPtrs] = recheck;
+	so->recheckDist[so->nPtrs] = recheckDist;
+
+	if (so->numberOfOrderBys > 0)
+	{
+		Size		size = sizeof(double) * so->numberOfOrderBys;
+
+		so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+	}
+
 	if (so->want_itup)
 	{
 		/*
@@ -616,14 +861,28 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 	{
 		if (so->iPtr < so->nPtrs)
 		{
-			/* continuing to return tuples from a leaf page */
+			/* continuing to return reported tuples */
 			scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
 			scan->xs_recheck = so->recheck[so->iPtr];
 			scan->xs_itup = so->indexTups[so->iPtr];
+
+			if (so->numberOfOrderBys > 0)
+				index_store_orderby_distances(scan, so->orderByTypes,
+											  so->distances[so->iPtr],
+											  so->recheckDist[so->iPtr]);
 			so->iPtr++;
 			return true;
 		}
 
+		if (so->numberOfOrderBys > 0)
+		{
+			/* Must pfree distances to avoid memory leak */
+			int			i;
+
+			for (i = 0; i < so->nPtrs; i++)
+				pfree(so->distances[i]);
+		}
+
 		if (so->want_itup)
 		{
 			/* Must pfree IndexTuples to avoid memory leak */
diff --git a/src/backend/access/spgist/spgtextproc.c b/src/backend/access/spgist/spgtextproc.c
index 8678854..00eb27f 100644
--- a/src/backend/access/spgist/spgtextproc.c
+++ b/src/backend/access/spgist/spgtextproc.c
@@ -86,6 +86,7 @@ spg_text_config(PG_FUNCTION_ARGS)
 	cfg->labelType = INT2OID;
 	cfg->canReturnData = true;
 	cfg->longValuesOK = true;	/* suffixing will shorten long values */
+	cfg->suppLen = 0; /* we don't need any supplimentary data */
 	PG_RETURN_VOID();
 }
 
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 78846be..9cfdcb1 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -15,16 +15,21 @@
 
 #include "postgres.h"
 
+#include "access/amvalidate.h"
+#include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_amop.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
+#include "utils/catcache.h"
 #include "utils/index_selfuncs.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
 
 /*
@@ -39,7 +44,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amstrategies = 0;
 	amroutine->amsupport = SPGISTNProc;
 	amroutine->amcanorder = false;
-	amroutine->amcanorderbyop = false;
+	amroutine->amcanorderbyop = true;
 	amroutine->amcanbackward = false;
 	amroutine->amcanunique = false;
 	amroutine->amcanmulticol = false;
@@ -59,7 +64,7 @@ spghandler(PG_FUNCTION_ARGS)
 	amroutine->amcanreturn = spgcanreturn;
 	amroutine->amcostestimate = spgcostestimate;
 	amroutine->amoptions = spgoptions;
-	amroutine->amproperty = NULL;
+	amroutine->amproperty = spgproperty;
 	amroutine->amvalidate = spgvalidate;
 	amroutine->ambeginscan = spgbeginscan;
 	amroutine->amrescan = spgrescan;
@@ -910,3 +915,82 @@ SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
 
 	return offnum;
 }
+
+/*
+ *	spgproperty() -- Check boolean properties of indexes.
+ *
+ * This is optional for most AMs, but is required for SP-GiST because the core
+ * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
+ */
+bool
+spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull)
+{
+	Oid			opclass,
+				opfamily,
+				opcintype;
+	CatCList   *catlist;
+	int			i;
+
+	/* Only answer column-level inquiries */
+	if (attno == 0)
+		return false;
+
+	switch (prop)
+	{
+		case AMPROP_DISTANCE_ORDERABLE:
+			break;
+		default:
+			return false;
+	}
+
+	/*
+	 * Currently, SP-GiST distance-ordered scans require that there be a
+	 * distance operator in the opclass with the default types. So we assume
+	 * that if such a operator exists, then there's a reason for it.
+	 */
+
+	/* First we need to know the column's opclass. */
+	opclass = get_index_column_opclass(index_oid, attno);
+	if (!OidIsValid(opclass))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* Now look up the opclass family and input datatype. */
+	if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
+	{
+		*isnull = true;
+		return true;
+	}
+
+	/* And now we can check whether the operator is provided. */
+	catlist = SearchSysCacheList1(AMOPSTRATEGY,
+								  ObjectIdGetDatum(opfamily));
+
+	*res = false;
+
+	for (i = 0; i < catlist->n_members; i++)
+	{
+		HeapTuple	amoptup = &catlist->members[i]->tuple;
+		Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
+
+		if (amopform->amoppurpose == AMOP_ORDER &&
+			(amopform->amoplefttype == opcintype ||
+			 amopform->amoprighttype == opcintype) &&
+			 opfamily_can_sort_type(amopform->amopsortfamily,
+									get_op_rettype(amopform->amopopr)))
+		{
+			*res = true;
+			break;
+		}
+	}
+
+	ReleaseSysCacheList(catlist);
+
+	*isnull = false;
+
+	return true;
+}
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 1bc5bce..0a3eeb6 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -22,6 +22,7 @@
 #include "catalog/pg_opfamily.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/lsyscache.h"
 #include "utils/regproc.h"
 #include "utils/syscache.h"
 
@@ -138,6 +139,7 @@ spgvalidate(Oid opclassoid)
 	{
 		HeapTuple	oprtup = &oprlist->members[i]->tuple;
 		Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
+		Oid			op_rettype;
 
 		/* TODO: Check that only allowed strategy numbers exist */
 		if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
@@ -151,20 +153,26 @@ spgvalidate(Oid opclassoid)
 			result = false;
 		}
 
-		/* spgist doesn't support ORDER BY operators */
-		if (oprform->amoppurpose != AMOP_SEARCH ||
-			OidIsValid(oprform->amopsortfamily))
+		/* spgist supports ORDER BY operators */
+		if (oprform->amoppurpose != AMOP_SEARCH)
 		{
-			ereport(INFO,
-					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
-					 errmsg("spgist operator family \"%s\" contains invalid ORDER BY specification for operator %s",
-							opfamilyname,
-							format_operator(oprform->amopopr))));
-			result = false;
+			/* ... and operator result must match the claimed btree opfamily */
+			op_rettype = get_op_rettype(oprform->amopopr);
+			if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
+			{
+				ereport(INFO,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg("spgist operator family %s contains incorrect ORDER BY opfamily specification for operator %s",
+								opfamilyname,
+								format_operator(oprform->amopopr))));
+				result = false;
+			}
 		}
+		else
+			op_rettype = BOOLOID;
 
 		/* Check operator signature --- same for all spgist strategies */
-		if (!check_amop_signature(oprform->amopopr, BOOLOID,
+		if (!check_amop_signature(oprform->amopopr, op_rettype,
 								  oprform->amoplefttype,
 								  oprform->amoprighttype))
 		{
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index aaf78bc..23ed9bb 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -133,7 +133,9 @@ typedef struct spgPickSplitOut
 typedef struct spgInnerConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbyKeys;
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue;		/* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -157,6 +159,7 @@ typedef struct spgInnerConsistentOut
 	int		   *levelAdds;		/* increment level by this much for each */
 	Datum	   *reconstructedValues;	/* associated reconstructed values */
 	void	  **traversalValues;	/* opclass-specific traverse values */
+	double	  **distances;		/* associated distances */
 } spgInnerConsistentOut;
 
 /*
@@ -165,7 +168,10 @@ typedef struct spgInnerConsistentOut
 typedef struct spgLeafConsistentIn
 {
 	ScanKey		scankeys;		/* array of operators and comparison values */
+	ScanKey		orderbykeys;	/* array of ordering operators and comparison
+								 * values */
 	int			nkeys;			/* length of array */
+	int			norderbys;
 
 	Datum		reconstructedValue;		/* value reconstructed at parent */
 	void	   *traversalValue; /* opclass-specific traverse value */
@@ -179,6 +185,8 @@ typedef struct spgLeafConsistentOut
 {
 	Datum		leafValue;		/* reconstructed original data, if any */
 	bool		recheck;		/* set true if operator must be rechecked */
+	bool		recheckDistances; /* set true if distances must be rechecked */
+	double	   *distances;		/* associated distances */
 } spgLeafConsistentOut;
 
 
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index b2979a9..14d0700 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -16,8 +16,10 @@
 
 #include "access/itup.h"
 #include "access/spgist.h"
+#include "lib/rbtree.h"
 #include "nodes/tidbitmap.h"
 #include "storage/buf.h"
+#include "storage/relfilenode.h"
 #include "utils/relcache.h"
 
 
@@ -129,12 +131,34 @@ typedef struct SpGistState
 	bool		isBuild;		/* true if doing index build */
 } SpGistState;
 
+typedef struct SpGistSearchItem
+{
+	pairingheap_node phNode;	/* pairing heap node */
+	Datum		value;			/* value reconstructed from parent or
+								 * leafValue if heaptuple */
+	void	   *traversalValue; /* opclass-specific traverse value */
+	int			level;			/* level of items on this page */
+	ItemPointerData heap;		/* heap info, if heap tuple */
+	bool		isLeaf;			/* SearchItem is heap item */
+	bool		recheckQual;	/* qual recheck is needed */
+	bool		recheckDist;	/* distance recheck is needed */
+	bool		isnull;
+
+	/* array with numberOfOrderBys entries */
+	double		distances[FLEXIBLE_ARRAY_MEMBER];
+} SpGistSearchItem;
+
+#define SizeOfSpGistSearchItem(n_distances) \
+	(offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances))
+
 /*
  * Private state of an index scan
  */
 typedef struct SpGistScanOpaqueData
 {
 	SpGistState state;			/* see above */
+	pairingheap *queue;			/* queue of unvisited items */
+	MemoryContext queueCxt;		/* context holding the queue */
 	MemoryContext tempCxt;		/* short-lived memory context */
 
 	/* Control flags showing whether to search nulls and/or non-nulls */
@@ -144,9 +168,17 @@ typedef struct SpGistScanOpaqueData
 	/* Index quals to be passed to opclass (null-related quals removed) */
 	int			numberOfKeys;	/* number of index qualifier conditions */
 	ScanKey		keyData;		/* array of index qualifier descriptors */
+	int			numberOfOrderBys;
+	ScanKey		orderByData;
+	Oid		   *orderByTypes;
+
+	FmgrInfo	innerConsistentFn;
+	FmgrInfo	leafConsistentFn;
+	Oid			indexCollation;
 
-	/* Stack of yet-to-be-visited pages */
-	List	   *scanStack;		/* List of ScanStackEntrys */
+	/* Pre-allocated workspace arrays: */
+	double	   *zeroDistances;
+	double	   *infDistances;
 
 	/* These fields are only used in amgetbitmap scans: */
 	TIDBitmap  *tbm;			/* bitmap being filled */
@@ -159,7 +191,9 @@ typedef struct SpGistScanOpaqueData
 	int			iPtr;			/* index for scanning through same */
 	ItemPointerData heapPtrs[MaxIndexTuplesPerPage];	/* TIDs from cur page */
 	bool		recheck[MaxIndexTuplesPerPage]; /* their recheck flags */
+	bool		recheckDist[MaxIndexTuplesPerPage]; /* distance recheck flags */
 	IndexTuple	indexTups[MaxIndexTuplesPerPage];		/* reconstructed tuples */
+	double	   *distances[MaxIndexTuplesPerPage];	/* distances (for recheck) */
 
 	/*
 	 * Note: using MaxIndexTuplesPerPage above is a bit hokey since
@@ -637,6 +671,9 @@ extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
 					 Item item, Size size,
 					 OffsetNumber *startOffset,
 					 bool errorOK);
+extern bool spgproperty(Oid index_oid, int attno,
+			IndexAMProperty prop, const char *propname,
+			bool *res, bool *isnull);
 
 /* spgdoinsert.c */
 extern void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index 53ca8bf..b0d1cb3 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -70,6 +70,7 @@
       <entry>Name</entry>
       <entry>Indexed Data Type</entry>
       <entry>Indexable Operators</entry>
+      <entry>Ordering Operators</entry>
      </row>
     </thead>
     <tbody>
@@ -84,6 +85,9 @@
        <literal>&gt;^</>
        <literal>~=</>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
      </row>
      <row>
       <entry><literal>quad_point_ops</></entry>
@@ -96,6 +100,9 @@
        <literal>&gt;^</>
        <literal>~=</>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
      </row>
      <row>
       <entry><literal>range_ops</></entry>
@@ -111,6 +118,8 @@
        <literal>&gt;&gt;</>
        <literal>@&gt;</>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>box_ops</></entry>
@@ -129,6 +138,8 @@
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>text_ops</></entry>
@@ -144,6 +155,8 @@
        <literal>~&gt;=~</>
        <literal>~&gt;~</>
       </entry>
+      <entry>
+      </entry>      
      </row>
      <row>
       <entry><literal>inet_ops</></entry>
@@ -161,6 +174,8 @@
        <literal>&lt;=</>
        <literal>=</>
       </entry>
+      <entry>
+      </entry>
      </row>
     </tbody>
    </tgroup>
@@ -172,6 +187,10 @@
   supports the same operators but uses a different index data structure which
   may offer better performance in some applications.
  </para>
+ <para>
+  By supporting the ordering &lt;-&gt; operator the quad_point_ops and kd_point_ops provide 
+  a user with the ability to perform a K-nearest-neighbour search over the indexed point dataset.
+ </para>
 
 </sect1>
 
diff --git a/src/backend/access/spgist/spgkdtreeproc.c b/src/backend/access/spgist/spgkdtreeproc.c
index 9a2649b..e074c3e 100644
--- a/src/backend/access/spgist/spgkdtreeproc.c
+++ b/src/backend/access/spgist/spgkdtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -162,6 +163,7 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	double		coord;
 	int			which;
 	int			i;
+	BOX			boxes[2];
 
 	Assert(in->hasPrefix);
 	coord = DatumGetFloat8(in->prefixDatum);
@@ -248,12 +250,75 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
 	}
 
 	/* We must descend into the children identified by which */
-	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
 	out->nNodes = 0;
+
+	if (!which)
+		PG_RETURN_VOID();
+
+	out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
+
+	if (in->norderbys > 0)
+	{
+		BOX			infArea;
+		BOX		   *area;
+
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			float8		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = (BOX *) in->traversalValue;
+			Assert(area);
+		}
+
+		boxes[0].low = area->low;
+		boxes[1].high = area->high;
+
+		if (in->level % 2)
+		{
+			/* split box by x */
+			boxes[0].high.x = boxes[1].low.x = coord;
+			boxes[0].high.y = area->high.y;
+			boxes[1].low.y = area->low.y;
+		}
+		else
+		{
+			/* split box by y */
+			boxes[0].high.y = boxes[1].low.y = coord;
+			boxes[0].high.x = area->high.x;
+			boxes[1].low.x = area->low.x;
+		}
+	}
+
 	for (i = 1; i <= 2; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *box = box_copy(&boxes[i - 1]);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = box;
+
+				spg_point_distance(BoxPGetDatum(box),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	/* Set up level increments, too */
diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c
new file mode 100644
index 0000000..ac386a4
--- /dev/null
+++ b/src/backend/access/spgist/spgproc.c
@@ -0,0 +1,67 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgproc.c
+ *	  Common procedures for SP-GiST.
+ *
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *			src/backend/access/spgist/spgproc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/spgist_private.h"
+#include "utils/geo_decls.h"
+
+/* Point-box distance in the assumption that box is aligned by axis */
+static double
+point_box_distance(Point *point, BOX *box)
+{
+	double		dx,
+				dy;
+
+	if (isnan(point->x) || isnan(box->low.x) ||
+		isnan(point->y) || isnan(box->low.y))
+		return get_float8_nan();
+
+	if (point->x < box->low.x)
+		dx = box->low.x - point->x;
+	else if (point->x > box->high.x)
+		dx = point->x - box->high.x;
+	else
+		dx = 0.0;
+
+	if (point->y < box->low.y)
+		dy = box->low.y - point->y;
+	else if (point->y > box->high.y)
+		dy = point->y - box->high.y;
+	else
+		dy = 0.0;
+
+	return HYPOT(dx, dy);
+}
+
+void
+spg_point_distance(Datum to, int norderbys, ScanKey orderbyKeys,
+				   double **distances, bool isLeaf)
+{
+	double	   *distance;
+	int			sk_num;
+
+	distance = *distances = (double *) palloc(norderbys * sizeof(double));
+
+	for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbyKeys, ++distance)
+	{
+		Point	   *point = DatumGetPointP(orderbyKeys->sk_argument);
+
+		*distance = isLeaf ? point_dt(point, DatumGetPointP(to))
+						   : point_box_distance(point, DatumGetBoxP(to));
+	}
+}
diff --git a/src/backend/access/spgist/spgquadtreeproc.c b/src/backend/access/spgist/spgquadtreeproc.c
index 6ad73f4..ede9ec5 100644
--- a/src/backend/access/spgist/spgquadtreeproc.c
+++ b/src/backend/access/spgist/spgquadtreeproc.c
@@ -17,6 +17,7 @@
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
 #include "utils/geo_decls.h"
@@ -77,6 +78,32 @@ getQuadrant(Point *centroid, Point *tst)
 	return 0;
 }
 
+/* Returns bounding box of a given quadrant */
+static BOX *
+getQuadrantArea(BOX *area, Point *centroid, int quadrant)
+{
+	BOX		   *box = (BOX *) palloc(sizeof(BOX));
+
+	switch (quadrant)
+	{
+		case 1:
+			box->high = area->high;
+			box->low = *centroid;
+			break;
+		case 2:
+			box_fill(box, centroid->x, area->high.x, area->low.y, centroid->y);
+			break;
+		case 3:
+			box->high = *centroid;
+			box->low = area->low;
+			break;
+		case 4:
+			box_fill(box, area->low.x, centroid->x, centroid->y, area->high.y);
+			break;
+	}
+
+	return box;
+}
 
 Datum
 spg_quad_choose(PG_FUNCTION_ARGS)
@@ -196,19 +223,56 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 	spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
 	spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
 	Point	   *centroid;
+	BOX			infArea;
+	BOX		   *area = NULL;
 	int			which;
 	int			i;
 
 	Assert(in->hasPrefix);
 	centroid = DatumGetPointP(in->prefixDatum);
 
+	if (in->norderbys > 0)
+	{
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+		out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+		if (in->level == 0)
+		{
+			double		inf = get_float8_infinity();
+
+			area = box_fill(&infArea, -inf, inf, -inf, inf);
+		}
+		else
+		{
+			area = in->traversalValue;
+			Assert(area);
+		}
+	}
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
 		out->nNodes = in->nNodes;
 		out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 		for (i = 0; i < in->nNodes; i++)
+		{
 			out->nodeNumbers[i] = i;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				/* Use parent quadrant box as traversalValue */
+				BOX		   *quadrant = box_copy(area);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[i] = quadrant;
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[i], false);
+			}
+		}
 		PG_RETURN_VOID();
 	}
 
@@ -253,8 +317,8 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 				boxQuery = DatumGetBoxP(in->scankeys[i].sk_argument);
 
 				if (DatumGetBool(DirectFunctionCall2(box_contain_pt,
-												   PointerGetDatum(boxQuery),
-												 PointerGetDatum(centroid))))
+													 PointerGetDatum(boxQuery),
+													PointerGetDatum(centroid))))
 				{
 					/* centroid is in box, so all quadrants are OK */
 				}
@@ -286,13 +350,37 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
 			break;				/* no need to consider remaining conditions */
 	}
 
+	out->levelAdds = palloc(sizeof(int) * 4);
+	for (i = 0; i < 4; ++i)
+		out->levelAdds[i] = 1;
+
 	/* We must descend into the quadrant(s) identified by which */
 	out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
 	out->nNodes = 0;
+
 	for (i = 1; i <= 4; i++)
 	{
 		if (which & (1 << i))
-			out->nodeNumbers[out->nNodes++] = i - 1;
+		{
+			out->nodeNumbers[out->nNodes] = i - 1;
+
+			if (in->norderbys > 0)
+			{
+				MemoryContext oldCtx = MemoryContextSwitchTo(
+													in->traversalMemoryContext);
+				BOX		   *quadrant = getQuadrantArea(area, centroid, i);
+
+				MemoryContextSwitchTo(oldCtx);
+
+				out->traversalValues[out->nNodes] = quadrant;
+
+				spg_point_distance(BoxPGetDatum(quadrant),
+								   in->norderbys, in->orderbyKeys,
+								   &out->distances[out->nNodes], false);
+			}
+
+			out->nNodes++;
+		}
 	}
 
 	PG_RETURN_VOID();
@@ -356,5 +444,10 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (res && in->norderbys > 0)
+		/* ok, it passes -> let's compute the distances */
+		spg_point_distance(in->leafDatum,
+					  in->norderbys, in->orderbykeys, &out->distances, true);
+
 	PG_RETURN_BOOL(res);
 }
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index f585b62..291159a 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -43,11 +43,23 @@ pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
 	IndexScanDesc scan = (IndexScanDesc) arg;
 	int			i;
 
-	/* Order according to distance comparison */
-	for (i = 0; i < scan->numberOfOrderBys; i++)
+	if (sa->isnull)
 	{
-		if (sa->distances[i] != sb->distances[i])
-			return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+		if (!sb->isnull)
+			return -1;
+	}
+	else if (sb->isnull)
+	{
+		return 1;
+	}
+	else
+	{
+		/* Order according to distance comparison */
+		for (i = 0; i < scan->numberOfOrderBys; i++)
+		{
+			if (sa->distances[i] != sb->distances[i])
+				return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+		}
 	}
 
 	/* Leaf items go before inner pages, to ensure a depth-first search */
@@ -81,7 +93,10 @@ static void
 spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem *item,
 						double *distances)
 {
-	memcpy(item->distances, distances, so->numberOfOrderBys * sizeof(double));
+	if (!item->isnull)
+		memcpy(item->distances, distances,
+			   so->numberOfOrderBys * sizeof(double));
+
 	pairingheap_add(so->queue, &item->phNode);
 }
 
@@ -126,7 +141,8 @@ resetSpGistScanOpaque(SpGistScanOpaque so)
 		int			i;
 
 		for (i = 0; i < so->nPtrs; i++)
-			pfree(so->distances[i]);
+			if (so->distances[i])
+				pfree(so->distances[i]);
 	}
 
 	if (so->want_itup)
@@ -828,9 +844,14 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
 
 	if (so->numberOfOrderBys > 0)
 	{
-		Size		size = sizeof(double) * so->numberOfOrderBys;
+		if (isnull)
+			so->distances[so->nPtrs] = NULL;
+		else
+		{
+			Size		size = sizeof(double) * so->numberOfOrderBys;
 
-		so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+			so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+		}
 	}
 
 	if (so->want_itup)
@@ -880,7 +901,8 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
 			int			i;
 
 			for (i = 0; i < so->nPtrs; i++)
-				pfree(so->distances[i]);
+				if (so->distances[i])
+					pfree(so->distances[i]);
 		}
 
 		if (so->want_itup)
diff --git a/src/backend/access/spgist/spgtextproc.c b/src/backend/access/spgist/spgtextproc.c
index 00eb27f..8678854 100644
--- a/src/backend/access/spgist/spgtextproc.c
+++ b/src/backend/access/spgist/spgtextproc.c
@@ -86,7 +86,6 @@ spg_text_config(PG_FUNCTION_ARGS)
 	cfg->labelType = INT2OID;
 	cfg->canReturnData = true;
 	cfg->longValuesOK = true;	/* suffixing will shorten long values */
-	cfg->suppLen = 0; /* we don't need any supplimentary data */
 	PG_RETURN_VOID();
 }
 
diff --git a/src/include/access/spgist_private.h b/src/include/access/spgist_private.h
index 14d0700..e1b6abf 100644
--- a/src/include/access/spgist_private.h
+++ b/src/include/access/spgist_private.h
@@ -685,4 +685,8 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
 extern bool spgdoinsert(Relation index, SpGistState *state,
 			ItemPointer heapPtr, Datum datum, bool isnull);
 
+/* spgproc.c */
+extern void spg_point_distance(Datum to, int norderbys,
+				   ScanKey orderbyKeys, double **distances, bool isLeaf);
+
 #endif   /* SPGIST_PRIVATE_H */
diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h
index 0251664..066cb46 100644
--- a/src/include/catalog/pg_amop.h
+++ b/src/include/catalog/pg_amop.h
@@ -764,6 +764,7 @@ DATA(insert (	4015   600 600 5 s	508 4000 0 ));
 DATA(insert (	4015   600 600 10 s 509 4000 0 ));
 DATA(insert (	4015   600 600 6 s	510 4000 0 ));
 DATA(insert (	4015   600 603 8 s	511 4000 0 ));
+DATA(insert (	4015   600 600 15 o 517 4000 1970 ));
 
 /*
  * SP-GiST kd_point_ops
@@ -774,6 +775,7 @@ DATA(insert (	4016   600 600 5 s	508 4000 0 ));
 DATA(insert (	4016   600 600 10 s 509 4000 0 ));
 DATA(insert (	4016   600 600 6 s	510 4000 0 ));
 DATA(insert (	4016   600 603 8 s	511 4000 0 ));
+DATA(insert (	4016   600 600 15 o 517 4000 1970 ));
 
 /*
  * SP-GiST text_ops
diff --git a/src/test/regress/expected/amutils.out b/src/test/regress/expected/amutils.out
index 74f7c9f..1464021 100644
--- a/src/test/regress/expected/amutils.out
+++ b/src/test/regress/expected/amutils.out
@@ -81,7 +81,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
@@ -90,18 +91,18 @@ select prop,
                     'bogus']::text[])
          with ordinality as u(prop,ord)
  order by ord;
-        prop        | btree | hash | gist | spgist | gin | brin 
---------------------+-------+------+------+--------+-----+------
- asc                | t     | f    | f    | f      | f   | f
- desc               | f     | f    | f    | f      | f   | f
- nulls_first        | f     | f    | f    | f      | f   | f
- nulls_last         | t     | f    | f    | f      | f   | f
- orderable          | t     | f    | f    | f      | f   | f
- distance_orderable | f     | f    | t    | f      | f   | f
- returnable         | t     | f    | f    | t      | f   | f
- search_array       | t     | f    | f    | f      | f   | f
- search_nulls       | t     | f    | t    | t      | f   | t
- bogus              |       |      |      |        |     | 
+        prop        | btree | hash | gist | spgist_radix | spgist_quad | gin | brin 
+--------------------+-------+------+------+--------------+-------------+-----+------
+ asc                | t     | f    | f    | f            | f           | f   | f
+ desc               | f     | f    | f    | f            | f           | f   | f
+ nulls_first        | f     | f    | f    | f            | f           | f   | f
+ nulls_last         | t     | f    | f    | f            | f           | f   | f
+ orderable          | t     | f    | f    | f            | f           | f   | f
+ distance_orderable | f     | f    | t    | f            | t           | f   | f
+ returnable         | t     | f    | f    | t            | t           | f   | f
+ search_array       | t     | f    | f    | f            | f           | f   | f
+ search_nulls       | t     | f    | t    | t            | t           | f   | t
+ bogus              |       |      |      |              |             |     | 
 (10 rows)
 
 select prop,
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index e519fdb..ab6e244 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -294,6 +294,15 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
  count 
 -------
@@ -883,6 +892,74 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+   n   | dist | p |   n   | dist | p 
+-------+------+---+-------+------+---
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+ 11001 |      |   |       |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+       |      |   | 11001 |      | 
+(6 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                        QUERY PLAN                        
 ---------------------------------------------------------
@@ -988,6 +1065,71 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 (1 row)
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
                          QUERY PLAN                         
 ------------------------------------------------------------
diff --git a/src/test/regress/expected/opr_sanity.out b/src/test/regress/expected/opr_sanity.out
index 0bcec13..5e98601 100644
--- a/src/test/regress/expected/opr_sanity.out
+++ b/src/test/regress/expected/opr_sanity.out
@@ -1816,6 +1816,7 @@ ORDER BY 1, 2, 3;
        4000 |           12 | <=
        4000 |           12 | |&>
        4000 |           14 | >=
+       4000 |           15 | <->
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
@@ -1828,7 +1829,7 @@ ORDER BY 1, 2, 3;
        4000 |           25 | <<=
        4000 |           26 | >>
        4000 |           27 | >>=
-(121 rows)
+(122 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing
diff --git a/src/test/regress/sql/amutils.sql b/src/test/regress/sql/amutils.sql
index cec1dcb..fa178e1 100644
--- a/src/test/regress/sql/amutils.sql
+++ b/src/test/regress/sql/amutils.sql
@@ -40,7 +40,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index 1648072..5db3153 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -198,6 +198,18 @@ SELECT count(*) FROM quad_point_tbl WHERE p >^ '(5000, 4000)';
 
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcde';
@@ -362,6 +374,36 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND seq.dist = idx.dist AND seq.p ~= idx.p
+WHERE seq.dist IS NULL OR idx.dist IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 
@@ -390,6 +432,39 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 
 EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c
index 748e568..7073146 100644
--- a/src/backend/access/spgist/spgdoinsert.c
+++ b/src/backend/access/spgist/spgdoinsert.c
@@ -1902,8 +1902,28 @@ spgdoinsert(Relation index, SpGistState *state,
 	 * cycles in the loop below.
 	 */
 	if (!isnull)
+	{
 		procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
 
+		/* Compress the data if the corresponding support function exists. */
+		if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
+		{
+			spgCompressIn in;
+			spgCompressIn out;
+			FmgrInfo   *compress = index_getprocinfo(index, 1,
+													 SPGIST_COMPRESS_PROC);
+
+			in.datum = datum;
+
+			FunctionCall2Coll(compress,
+							  index->rd_indcollation[0],
+							  PointerGetDatum(&in),
+							  PointerGetDatum(&out));
+
+			datum = out.datum;
+		}
+	}
+
 	/*
 	 * Since we don't use index_form_tuple in this AM, we have to make sure
 	 * value to be inserted is not toasted; FormIndexDatum doesn't guarantee
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 0a3eeb6..02cc908 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -26,6 +26,7 @@
 #include "utils/regproc.h"
 #include "utils/syscache.h"
 
+#define SPGIST_OPTIONAL_PROCS_MASK (((uint64) 1) << SPGIST_COMPRESS_PROC)
 
 /*
  * Validator for an SP-GiST opclass.
@@ -104,6 +105,7 @@ spgvalidate(Oid opclassoid)
 			case SPGIST_CHOOSE_PROC:
 			case SPGIST_PICKSPLIT_PROC:
 			case SPGIST_INNER_CONSISTENT_PROC:
+			case SPGIST_COMPRESS_PROC:
 				ok = check_amproc_signature(procform->amproc, VOIDOID, true,
 											2, 2, INTERNALOID, INTERNALOID);
 				break;
@@ -224,6 +226,8 @@ spgvalidate(Oid opclassoid)
 		{
 			if ((thisgroup->functionset & (((uint64) 1) << i)) != 0)
 				continue;		/* got it */
+			if ((SPGIST_OPTIONAL_PROCS_MASK & (((uint64) 1) << i)) != 0)
+				continue;		/* support function is optional */
 			ereport(INFO,
 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
 					 errmsg("spgist operator family \"%s\" is missing support function %d for type %s",
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index 23ed9bb..2927605 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -30,7 +30,8 @@
 #define SPGIST_PICKSPLIT_PROC			3
 #define SPGIST_INNER_CONSISTENT_PROC	4
 #define SPGIST_LEAF_CONSISTENT_PROC		5
-#define SPGISTNProc						5
+#define SPGIST_COMPRESS_PROC			6
+#define SPGISTNProc						6
 
 /*
  * Argument structs for spg_config method
@@ -189,6 +190,19 @@ typedef struct spgLeafConsistentOut
 	double	   *distances;		/* associated distances */
 } spgLeafConsistentOut;
 
+/*
+ * Argument structs for spg_compress method
+ */
+typedef struct spgCompressIn
+{
+	Datum		datum;			/* data to be compressed for storage,
+								 * can be toasted */
+} spgCompressIn;
+
+typedef struct spgCompressOut
+{
+	Datum		datum;			/* compressed data to be stored at leaf */
+} spgCompressOut;
 
 /* spgutils.c */
 extern bytea *spgoptions(Datum reloptions, bool validate);
diff --git a/doc/src/sgml/spgist.sgml b/doc/src/sgml/spgist.sgml
index b0d1cb3..491e417 100644
--- a/doc/src/sgml/spgist.sgml
+++ b/doc/src/sgml/spgist.sgml
@@ -142,6 +142,48 @@
       </entry>
      </row>
      <row>
+      <entry><literal>circle_ops</></entry>
+      <entry><type>circle</></entry>
+      <entry>
+       <literal>&lt;&lt;</>
+       <literal>&amp;&lt;</>
+       <literal>&amp;&amp;</>
+       <literal>&amp;&gt;</>
+       <literal>&gt;&gt;</>
+       <literal>~=</>
+       <literal>@&gt;</>
+       <literal>&lt;@</>
+       <literal>&amp;&lt;|</>
+       <literal>&lt;&lt;|</>
+       <literal>|&gt;&gt;</>
+       <literal>|&amp;&gt;</>
+      </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
+     </row>
+     <row>
+      <entry><literal>poly_ops</></entry>
+      <entry><type>polygon</></entry>
+      <entry>
+       <literal>&lt;&lt;</>
+       <literal>&amp;&lt;</>
+       <literal>&amp;&amp;</>
+       <literal>&amp;&gt;</>
+       <literal>&gt;&gt;</>
+       <literal>~=</>
+       <literal>@&gt;</>
+       <literal>&lt;@</>
+       <literal>&amp;&lt;|</>
+       <literal>&lt;&lt;|</>
+       <literal>|&gt;&gt;</>
+       <literal>|&amp;&gt;</>
+      </entry>
+      <entry>
+       <literal>&lt;-&gt;</>
+      </entry>
+     </row>
+     <row>
       <entry><literal>text_ops</></entry>
       <entry><type>text</></entry>
       <entry>
diff --git a/src/backend/utils/adt/geo_spgist.c b/src/backend/utils/adt/geo_spgist.c
index c95ec1c..bd0284f 100644
--- a/src/backend/utils/adt/geo_spgist.c
+++ b/src/backend/utils/adt/geo_spgist.c
@@ -74,9 +74,11 @@
 #include "postgres.h"
 
 #include "access/spgist.h"
+#include "access/spgist_private.h"
 #include "access/stratnum.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/geo_decls.h"
 
 /*
@@ -366,6 +368,31 @@ overAbove4D(RectBox *rect_box, RangeBox *query)
 	return overHigher2D(&rect_box->range_box_y, &query->right);
 }
 
+/* Lower bound for the distance between point and rect_box */
+static double
+pointToRectBoxDistance(Point *point, RectBox *rect_box)
+{
+	double		dx;
+	double		dy;
+
+	if (point->x < rect_box->range_box_x.left.low)
+		dx = rect_box->range_box_x.left.low - point->x;
+	else if (point->x > rect_box->range_box_x.right.high)
+		dx = point->x - rect_box->range_box_x.right.high;
+	else
+		dx = 0;
+
+	if (point->y < rect_box->range_box_y.left.low)
+		dy = rect_box->range_box_y.left.low - point->y;
+	else if (point->y > rect_box->range_box_y.right.high)
+		dy = point->y - rect_box->range_box_y.right.high;
+	else
+		dy = 0;
+
+	return HYPOT(dx, dy);
+}
+
+
 /*
  * SP-GiST config function
  */
@@ -473,6 +500,64 @@ spg_box_quad_picksplit(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/* get circle bounding box */
+static BOX *
+circle_bbox(CIRCLE *circle)
+{
+	BOX		   *bbox = (BOX *) palloc(sizeof(BOX));
+
+	bbox->high.x = circle->center.x + circle->radius;
+	bbox->low.x = circle->center.x - circle->radius;
+	bbox->high.y = circle->center.y + circle->radius;
+	bbox->low.y = circle->center.y - circle->radius;
+
+	return bbox;
+}
+
+static bool
+is_bounding_box_test_exact(StrategyNumber strategy)
+{
+	switch (strategy)
+	{
+		case RTLeftStrategyNumber:
+		case RTOverLeftStrategyNumber:
+		case RTOverRightStrategyNumber:
+		case RTRightStrategyNumber:
+		case RTOverBelowStrategyNumber:
+		case RTBelowStrategyNumber:
+		case RTAboveStrategyNumber:
+		case RTOverAboveStrategyNumber:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
+static BOX *
+spg_box_quad_get_scankey_bbox(ScanKey sk, bool *recheck)
+{
+	switch (sk->sk_subtype)
+	{
+		case BOXOID:
+			return DatumGetBoxP(sk->sk_argument);
+
+		case CIRCLEOID:
+			if (recheck && !is_bounding_box_test_exact(sk->sk_strategy))
+				*recheck = true;
+			return circle_bbox(DatumGetCircleP(sk->sk_argument));
+
+		case POLYGONOID:
+			if (recheck && !is_bounding_box_test_exact(sk->sk_strategy))
+				*recheck = true;
+			return &DatumGetPolygonP(sk->sk_argument)->boundbox;
+
+		default:
+			elog(ERROR, "unrecognized scankey subtype: %d", sk->sk_subtype);
+			return NULL;
+	}
+}
+
 /*
  * SP-GiST inner consistent function
  */
@@ -488,6 +573,15 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 	RangeBox   *centroid,
 			  **queries;
 
+	/*
+	 * We are saving the traversal value or initialize it an unbounded one, if
+	 * we have just begun to walk the tree.
+	 */
+	if (in->traversalValue)
+		rect_box = in->traversalValue;
+	else
+		rect_box = initRectBox();
+
 	if (in->allTheSame)
 	{
 		/* Report that all nodes should be visited */
@@ -496,31 +590,51 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		for (i = 0; i < in->nNodes; i++)
 			out->nodeNumbers[i] = i;
 
+		if (in->norderbys > 0 && in->nNodes > 0)
+		{
+			double	   *distances = palloc(sizeof(double) * in->norderbys);
+			int			j;
+
+			for (j = 0; j < in->norderbys; j++)
+			{
+				Point	   *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+				distances[j] = pointToRectBoxDistance(pt, rect_box);
+			}
+
+			out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+			out->distances[0] = distances;
+
+			for (i = 1; i < in->nNodes; i++)
+			{
+				out->distances[i] = palloc(sizeof(double) * in->norderbys);
+				memcpy(out->distances[i], distances,
+					   sizeof(double) * in->norderbys);
+			}
+		}
+
 		PG_RETURN_VOID();
 	}
 
 	/*
-	 * We are saving the traversal value or initialize it an unbounded one, if
-	 * we have just begun to walk the tree.
-	 */
-	if (in->traversalValue)
-		rect_box = in->traversalValue;
-	else
-		rect_box = initRectBox();
-
-	/*
 	 * We are casting the prefix and queries to RangeBoxes for ease of the
 	 * following operations.
 	 */
 	centroid = getRangeBox(DatumGetBoxP(in->prefixDatum));
 	queries = (RangeBox **) palloc(in->nkeys * sizeof(RangeBox *));
 	for (i = 0; i < in->nkeys; i++)
-		queries[i] = getRangeBox(DatumGetBoxP(in->scankeys[i].sk_argument));
+	{
+		BOX		   *box = spg_box_quad_get_scankey_bbox(&in->scankeys[i], NULL);
+
+		queries[i] = getRangeBox(box);
+	}
 
 	/* Allocate enough memory for nodes */
 	out->nNodes = 0;
 	out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
 	out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+	if (in->norderbys > 0)
+		out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
 
 	/*
 	 * We switch memory context, because we want to allocate memory for new
@@ -598,6 +712,22 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
 		{
 			out->traversalValues[out->nNodes] = next_rect_box;
 			out->nodeNumbers[out->nNodes] = quadrant;
+
+			if (in->norderbys > 0)
+			{
+				double	   *distances = palloc(sizeof(double) * in->norderbys);
+				int			j;
+
+				out->distances[out->nNodes] = distances;
+
+				for (j = 0; j < in->norderbys; j++)
+				{
+					Point *pt = DatumGetPointP(in->orderbyKeys[j].sk_argument);
+
+					distances[j] = pointToRectBoxDistance(pt, next_rect_box);
+				}
+			}
+
 			out->nNodes++;
 		}
 		else
@@ -638,7 +768,9 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
 	for (i = 0; i < in->nkeys; i++)
 	{
 		StrategyNumber strategy = in->scankeys[i].sk_strategy;
-		Datum		query = in->scankeys[i].sk_argument;
+		BOX		   *box = 	spg_box_quad_get_scankey_bbox(&in->scankeys[i],
+														  &out->recheck);
+		Datum		query = BoxPGetDatum(box);
 
 		switch (strategy)
 		{
@@ -711,5 +843,63 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
+	if (flag && in->norderbys > 0)
+	{
+		Oid			distfnoid = in->orderbykeys[0].sk_func.fn_oid;
+
+		spg_point_distance(leaf, in->norderbys, in->orderbykeys,
+						   &out->distances, false);
+
+		/* Recheck is necessary when computing distance to polygon or circle */
+		out->recheckDistances = distfnoid == F_DIST_CPOINT ||
+								distfnoid == F_DIST_POLYP;
+	}
+
 	PG_RETURN_BOOL(flag);
 }
+
+
+/*
+ * SP-GiST config function for 2-D types that are lossy represented by their
+ * bounding boxes
+ */
+Datum
+spg_bbox_quad_config(PG_FUNCTION_ARGS)
+{
+	spgConfigOut *cfg = (spgConfigOut *) PG_GETARG_POINTER(1);
+
+	cfg->prefixType = BOXOID;	/* A type represented by its bounding box */
+	cfg->labelType = VOIDOID;	/* We don't need node labels. */
+	cfg->canReturnData = false;
+	cfg->longValuesOK = false;
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST compress function for circles
+ */
+Datum
+spg_circle_quad_compress(PG_FUNCTION_ARGS)
+{
+	spgCompressIn *in = (spgCompressIn *) PG_GETARG_POINTER(0);
+	spgCompressOut *out = (spgCompressOut *) PG_GETARG_POINTER(1);
+
+	out->datum = BoxPGetDatum(circle_bbox(DatumGetCircleP(in->datum)));
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * SP-GiST compress function for polygons
+ */
+Datum
+spg_poly_quad_compress(PG_FUNCTION_ARGS)
+{
+	spgCompressIn *in = (spgCompressIn *) PG_GETARG_POINTER(0);
+	spgCompressOut *out = (spgCompressOut *) PG_GETARG_POINTER(1);
+
+	out->datum = BoxPGetDatum(box_copy(&DatumGetPolygonP(in->datum)->boundbox));
+
+	PG_RETURN_VOID();
+}
diff --git a/src/include/catalog/pg_amop.h b/src/include/catalog/pg_amop.h
index 066cb46..722e5b4 100644
--- a/src/include/catalog/pg_amop.h
+++ b/src/include/catalog/pg_amop.h
@@ -848,6 +848,40 @@ DATA(insert (	5000	603  603 11 s	2573	4000 0 ));
 DATA(insert (	5000	603  603 12 s	2572	4000 0 ));
 
 /*
+ * SP-GiST circle_ops
+ */
+DATA(insert (	5007	718  718  1 s	1506	4000 0 ));
+DATA(insert (	5007	718  718  2 s	1507	4000 0 ));
+DATA(insert (	5007	718  718  3 s	1513	4000 0 ));
+DATA(insert (	5007	718  718  4 s	1508	4000 0 ));
+DATA(insert (	5007	718  718  5 s	1509	4000 0 ));
+DATA(insert (	5007	718  718  6 s	1512	4000 0 ));
+DATA(insert (	5007	718  718  7 s	1511	4000 0 ));
+DATA(insert (	5007	718  718  8 s	1510	4000 0 ));
+DATA(insert (	5007	718  718  9 s	2589	4000 0 ));
+DATA(insert (	5007	718  718 10 s	1515	4000 0 ));
+DATA(insert (	5007	718  718 11 s	1514	4000 0 ));
+DATA(insert (	5007	718  718 12 s	2590	4000 0 ));
+DATA(insert (	5007	718  600 15 o	3291	4000 1970 ));
+
+/*
+ * SP-GiST poly_ops (supports polygons)
+ */
+DATA(insert (	5008   604	604  1 s	 485	4000 0 ));
+DATA(insert (	5008   604	604  2 s	 486	4000 0 ));
+DATA(insert (	5008   604	604  3 s	 492	4000 0 ));
+DATA(insert (	5008   604	604  4 s	 487	4000 0 ));
+DATA(insert (	5008   604	604  5 s	 488	4000 0 ));
+DATA(insert (	5008   604	604  6 s	 491	4000 0 ));
+DATA(insert (	5008   604	604  7 s	 490	4000 0 ));
+DATA(insert (	5008   604	604  8 s	 489	4000 0 ));
+DATA(insert (	5008   604	604  9 s	2575	4000 0 ));
+DATA(insert (	5008   604	604 10 s	2574	4000 0 ));
+DATA(insert (	5008   604	604 11 s	2577	4000 0 ));
+DATA(insert (	5008   604	604 12 s	2576	4000 0 ));
+DATA(insert (	5008   604	600 15 o	3289	4000 1970 ));
+
+/*
  * GiST inet_ops
  */
 DATA(insert (	3550	869 869 3 s		3552 783 0 ));
diff --git a/src/include/catalog/pg_amproc.h b/src/include/catalog/pg_amproc.h
index f1a52ce..2de94f3 100644
--- a/src/include/catalog/pg_amproc.h
+++ b/src/include/catalog/pg_amproc.h
@@ -306,6 +306,18 @@ DATA(insert (	5000   603 603 2 5013 ));
 DATA(insert (	5000   603 603 3 5014 ));
 DATA(insert (	5000   603 603 4 5015 ));
 DATA(insert (	5000   603 603 5 5016 ));
+DATA(insert (	5007   718 718 1 5009 ));
+DATA(insert (	5007   718 718 2 5013 ));
+DATA(insert (	5007   718 718 3 5014 ));
+DATA(insert (	5007   718 718 4 5015 ));
+DATA(insert (	5007   718 718 5 5016 ));
+DATA(insert (	5007   718 718 6 5010 ));
+DATA(insert (	5008   604 604 1 5009 ));
+DATA(insert (	5008   604 604 2 5013 ));
+DATA(insert (	5008   604 604 3 5014 ));
+DATA(insert (	5008   604 604 4 5015 ));
+DATA(insert (	5008   604 604 5 5016 ));
+DATA(insert (	5008   604 604 6 5011 ));
 
 /* BRIN opclasses */
 /* minmax bytea */
diff --git a/src/include/catalog/pg_opclass.h b/src/include/catalog/pg_opclass.h
index 0cde14c..d8eded9 100644
--- a/src/include/catalog/pg_opclass.h
+++ b/src/include/catalog/pg_opclass.h
@@ -203,6 +203,8 @@ DATA(insert (	4000	box_ops				PGNSP PGUID 5000  603  t 0 ));
 DATA(insert (	4000	quad_point_ops		PGNSP PGUID 4015  600 t 0 ));
 DATA(insert (	4000	kd_point_ops		PGNSP PGUID 4016  600 f 0 ));
 DATA(insert (	4000	text_ops			PGNSP PGUID 4017  25 t 0 ));
+DATA(insert (	4000	circle_ops			PGNSP PGUID 5007  718 t 603 ));
+DATA(insert (	4000	poly_ops			PGNSP PGUID 5008  604 t 603 ));
 DATA(insert (	403		jsonb_ops			PGNSP PGUID 4033  3802 t 0 ));
 DATA(insert (	405		jsonb_ops			PGNSP PGUID 4034  3802 t 0 ));
 DATA(insert (	2742	jsonb_ops			PGNSP PGUID 4036  3802 t 25 ));
diff --git a/src/include/catalog/pg_opfamily.h b/src/include/catalog/pg_opfamily.h
index bd673fe..4bb26cd 100644
--- a/src/include/catalog/pg_opfamily.h
+++ b/src/include/catalog/pg_opfamily.h
@@ -183,5 +183,7 @@ DATA(insert OID = 4103 (	3580	range_inclusion_ops		PGNSP PGUID ));
 DATA(insert OID = 4082 (	3580	pg_lsn_minmax_ops		PGNSP PGUID ));
 DATA(insert OID = 4104 (	3580	box_inclusion_ops		PGNSP PGUID ));
 DATA(insert OID = 5000 (	4000	box_ops		PGNSP PGUID ));
+DATA(insert OID = 5007 (	4000	circle_ops				PGNSP PGUID ));
+DATA(insert OID = 5008 (	4000	poly_ops				PGNSP PGUID ));
 
 #endif   /* PG_OPFAMILY_H */
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 05652e8..2817606 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5186,6 +5186,13 @@ DESCR("SP-GiST support for quad tree over box");
 DATA(insert OID = 5016 (  spg_box_quad_leaf_consistent	PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 16 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_box_quad_leaf_consistent _null_ _null_ _null_ ));
 DESCR("SP-GiST support for quad tree over box");
 
+DATA(insert OID = 5009 (  spg_bbox_quad_config PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_bbox_quad_config _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over 2-D types represented by their bounding boxes");
+DATA(insert OID = 5010 (  spg_circle_quad_compress PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_circle_quad_compress _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over circle");
+DATA(insert OID = 5011 (  spg_poly_quad_compress PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 2278 "2281 2281" _null_ _null_ _null_ _null_  _null_ spg_poly_quad_compress _null_ _null_ _null_ ));
+DESCR("SP-GiST support for quad tree over polygons");
+
 /* replication slots */
 DATA(insert OID = 3779 (  pg_create_physical_replication_slot PGNSP PGUID 12 1 0 0 0 f f f f t f v u 3 0 2249 "19 16 16" "{19,16,16,19,3220}" "{i,i,i,o,o}" "{slot_name,immediately_reserve,temporary,slot_name,xlog_position}" _null_ _null_ pg_create_physical_replication_slot _null_ _null_ _null_ ));
 DESCR("create a physical replication slot");
diff --git a/src/test/regress/expected/circle.out b/src/test/regress/expected/circle.out
index 9ba4a04..2d4bf70 100644
--- a/src/test/regress/expected/circle.out
+++ b/src/test/regress/expected/circle.out
@@ -97,3 +97,291 @@ SELECT '' as five, c1.f1 AS one, c2.f1 AS two, (c1.f1 <-> c2.f1) AS distance
       | <(1,2),3>      | <(100,200),10> | 208.370729772479
 (5 rows)
 
+--
+-- Test the SP-GiST index
+--
+CREATE TEMPORARY TABLE quad_circle_tbl (id int, c circle);
+INSERT INTO quad_circle_tbl
+	SELECT (x - 1) * 100 + y, circle(point(x * 10, y * 10), 1 + (x + y) % 10)
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+INSERT INTO quad_circle_tbl
+	SELECT i, '<(200, 300), 5>'
+	FROM generate_series(10001, 11000) AS i;
+INSERT INTO quad_circle_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, '<(0,100), infinity>'),
+		(11004, '<(-infinity,0),1000>'),
+		(11005, '<(infinity,-infinity),infinity>');
+CREATE INDEX quad_circle_tbl_idx ON quad_circle_tbl USING spgist(c);
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c << '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c << '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+ count 
+-------
+   891
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &< '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &< '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+ count 
+-------
+  5901
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c && '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c && '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+ count 
+-------
+  2334
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+ count 
+-------
+ 10000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c >> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c >> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+ count 
+-------
+  4990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c <<| '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c <<| '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+ count 
+-------
+  1890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c &<| '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c &<| '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+ count 
+-------
+  6900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c |&> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c |&> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+ count 
+-------
+  9000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+                         QUERY PLAN                          
+-------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c |>> '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c |>> '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+ count 
+-------
+  3990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c @> '<(300,400),1>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c @> '<(300,400),1>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+ count 
+-------
+     2
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+                         QUERY PLAN                         
+------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c <@ '<(300,400),200>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c <@ '<(300,400),200>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+ count 
+-------
+  2181
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_circle_tbl
+         Recheck Cond: (c ~= '<(300,400),1>'::circle)
+         ->  Bitmap Index Scan on quad_circle_tbl_idx
+               Index Cond: (c ~= '<(300,400),1>'::circle)
+(5 rows)
+
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+ count 
+-------
+     1
+(1 row)
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_circle_tbl_idx on quad_circle_tbl
+         Order By: (c <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+SELECT *
+FROM quad_cirle_tbl_ord_seq1 seq FULL JOIN quad_cirle_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+                          QUERY PLAN                           
+---------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_circle_tbl_idx on quad_circle_tbl
+         Index Cond: (c <@ '<(300,400),200>'::circle)
+         Order By: (c <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+SELECT *
+FROM quad_cirle_tbl_ord_seq2 seq FULL JOIN quad_cirle_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/expected/polygon.out b/src/test/regress/expected/polygon.out
index 2361274..7a623be 100644
--- a/src/test/regress/expected/polygon.out
+++ b/src/test/regress/expected/polygon.out
@@ -227,3 +227,289 @@ SELECT	'(0,0)'::point <-> '((0,0),(1,2),(2,1))'::polygon as on_corner,
          0 |          0 |      0 | 1.4142135623731 |          3.2
 (1 row)
 
+--
+-- Test the SP-GiST index
+--
+CREATE TEMPORARY TABLE quad_poly_tbl (id int, p polygon);
+INSERT INTO quad_poly_tbl
+	SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+INSERT INTO quad_poly_tbl
+	SELECT i, polygon '((200, 300),(210, 310),(230, 290))'
+	FROM generate_series(10001, 11000) AS i;
+INSERT INTO quad_poly_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, NULL);
+CREATE INDEX quad_poly_tbl_idx ON quad_poly_tbl USING spgist(p);
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+CREATE TEMP TABLE quad_poly_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+CREATE TEMP TABLE quad_poly_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p << '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p << '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  3890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &< '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &< '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  7900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p && '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p && '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+   977
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  7000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p >> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p >> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  2990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p <<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p <<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  1890
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p &<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p &<| '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  6900
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p |&> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p |&> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  9000
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p |>> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p |>> '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+  3990
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+ count 
+-------
+   831
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p @> '((340,550),(343,552),(341,553))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p @> '((340,550),(343,552),(341,553))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+ count 
+-------
+     1
+(1 row)
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Aggregate
+   ->  Bitmap Heap Scan on quad_poly_tbl
+         Recheck Cond: (p ~= '((200,300),(210,310),(230,290))'::polygon)
+         ->  Bitmap Index Scan on quad_poly_tbl_idx
+               Index Cond: (p ~= '((200,300),(210,310),(230,290))'::polygon)
+(5 rows)
+
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+ count 
+-------
+  1000
+(1 row)
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Order By: (p <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                   QUERY PLAN                                    
+---------------------------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         Order By: (p <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/sql/circle.sql b/src/test/regress/sql/circle.sql
index c0284b2..16fa514 100644
--- a/src/test/regress/sql/circle.sql
+++ b/src/test/regress/sql/circle.sql
@@ -43,3 +43,131 @@ SELECT '' as five, c1.f1 AS one, c2.f1 AS two, (c1.f1 <-> c2.f1) AS distance
   FROM CIRCLE_TBL c1, CIRCLE_TBL c2
   WHERE (c1.f1 < c2.f1) AND ((c1.f1 <-> c2.f1) > 0)
   ORDER BY distance, area(c1.f1), area(c2.f1);
+
+--
+-- Test the SP-GiST index
+--
+
+CREATE TEMPORARY TABLE quad_circle_tbl (id int, c circle);
+
+INSERT INTO quad_circle_tbl
+	SELECT (x - 1) * 100 + y, circle(point(x * 10, y * 10), 1 + (x + y) % 10)
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+
+INSERT INTO quad_circle_tbl
+	SELECT i, '<(200, 300), 5>'
+	FROM generate_series(10001, 11000) AS i;
+
+INSERT INTO quad_circle_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, '<(0,100), infinity>'),
+		(11004, '<(-infinity,0),1000>'),
+		(11005, '<(infinity,-infinity),infinity>');
+
+CREATE INDEX quad_circle_tbl_idx ON quad_circle_tbl USING spgist(c);
+
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c << circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &< circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c && circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c >> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c <<| circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c &<| circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c |&> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c |>> circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+SELECT count(*) FROM quad_circle_tbl WHERE c @> circle '<(300,400),1>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+SELECT count(*) FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+SELECT count(*) FROM quad_circle_tbl WHERE c ~= circle '<(300,400),1>';
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl;
+
+SELECT *
+FROM quad_cirle_tbl_ord_seq1 seq FULL JOIN quad_cirle_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+CREATE TEMP TABLE quad_cirle_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY c <-> point '123,456') n, c <-> point '123,456' dist, id
+FROM quad_circle_tbl WHERE c <@ circle '<(300,400),200>';
+
+SELECT *
+FROM quad_cirle_tbl_ord_seq2 seq FULL JOIN quad_cirle_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
diff --git a/src/test/regress/sql/polygon.sql b/src/test/regress/sql/polygon.sql
index 7ac8079..8b8b434 100644
--- a/src/test/regress/sql/polygon.sql
+++ b/src/test/regress/sql/polygon.sql
@@ -116,3 +116,129 @@ SELECT	'(0,0)'::point <-> '((0,0),(1,2),(2,1))'::polygon as on_corner,
 	'(2,2)'::point <-> '((0,0),(1,4),(3,1))'::polygon as inside,
 	'(3,3)'::point <-> '((0,2),(2,0),(2,2))'::polygon as near_corner,
 	'(4,4)'::point <-> '((0,0),(0,3),(4,0))'::polygon as near_segment;
+
+--
+-- Test the SP-GiST index
+--
+
+CREATE TEMPORARY TABLE quad_poly_tbl (id int, p polygon);
+
+INSERT INTO quad_poly_tbl
+	SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10))
+	FROM generate_series(1, 100) x,
+		 generate_series(1, 100) y;
+
+INSERT INTO quad_poly_tbl
+	SELECT i, polygon '((200, 300),(210, 310),(230, 290))'
+	FROM generate_series(10001, 11000) AS i;
+
+INSERT INTO quad_poly_tbl
+	VALUES
+		(11001, NULL),
+		(11002, NULL),
+		(11003, NULL);
+
+CREATE INDEX quad_poly_tbl_idx ON quad_poly_tbl USING spgist(p);
+
+-- get reference results for ORDER BY distance from seq scan
+SET enable_seqscan = ON;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = OFF;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+-- check results results from index scan
+SET enable_seqscan = OFF;
+SET enable_indexscan = OFF;
+SET enable_bitmapscan = ON;
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p << polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &< polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p && polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p >> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p <<| polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p &<| polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p |&> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p |>> polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT count(*) FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+SELECT count(*) FROM quad_poly_tbl WHERE p @> polygon '((340,550),(343,552),(341,553))';
+
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
+
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+	ON seq.n = idx.n AND seq.id = idx.id AND
+		(seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+RESET enable_seqscan;
+RESET enable_indexscan;
+RESET enable_bitmapscan;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to