From d4459fe464d41bdd3fa5e81b310b095560f4f5b0 Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Sat, 17 Jun 2023 17:03:36 -0700
Subject: [PATCH v1] Enhance nbtree ScalarArrayOp execution.

Teach nbtree to avoid primitive index when executing a scan with
ScalarArrayOp keys.
---
 src/include/access/nbtree.h           |  46 +-
 src/backend/access/nbtree/nbtree.c    |  21 +-
 src/backend/access/nbtree/nbtsearch.c |  85 +++-
 src/backend/access/nbtree/nbtutils.c  | 589 +++++++++++++++++++++++++-
 src/backend/optimizer/path/indxpath.c | 206 +++++++--
 src/backend/utils/adt/selfuncs.c      |  56 ++-
 6 files changed, 919 insertions(+), 84 deletions(-)

diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 8891fa797..5935dbc86 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -1034,6 +1034,42 @@ typedef struct BTArrayKeyInfo
 	Datum	   *elem_values;	/* array of num_elems Datums */
 } BTArrayKeyInfo;
 
+/*
+ * _bt_readpage state used across _bt_checkkeys calls for a page
+ */
+typedef struct BTReadPageState
+{
+	/*
+	 * Input parameters set by _bt_readpage, for _bt_checkkeys.
+	 *
+	 * dir: scan direction
+	 *
+	 * highkey:	page high key
+	 *
+	 * SK_SEARCHARRAY forward scans are required to set the page high key up
+	 * front.
+	 */
+	ScanDirection dir;
+	IndexTuple	highkey;
+
+	/*
+	 * Output parameters set by _bt_checkkeys, for _bt_readpage.
+	 *
+	 * continuescan: Is there a need to continue the scan beyond this tuple?
+	 */
+	bool		continuescan;
+
+	/*
+	 * Private _bt_checkkeys state, describes caller's page.
+	 *
+	 * match_for_cur_array_keys: _bt_checkkeys returned true once or more?
+	 *
+	 * highkeychecked: Current set of array keys checked against high key?
+	 */
+	bool		match_for_cur_array_keys;
+	bool		highkeychecked;
+} BTReadPageState;
+
 typedef struct BTScanOpaqueData
 {
 	/* these fields are set by _bt_preprocess_keys(): */
@@ -1047,7 +1083,9 @@ typedef struct BTScanOpaqueData
 								 * there are any unsatisfiable array keys) */
 	int			arrayKeyCount;	/* count indicating number of array scan keys
 								 * processed */
+	bool		arrayKeysStarted;	/* Scan still processing array keys? */
 	BTArrayKeyInfo *arrayKeys;	/* info about each equality-type array key */
+	BTScanInsert arrayPoskey;	/* initial positioning insertion scan key */
 	MemoryContext arrayContext; /* scan-lifespan context for array data */
 
 	/* info about killed items if any (killedItems is NULL if never used) */
@@ -1253,8 +1291,12 @@ extern bool _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir);
 extern void _bt_mark_array_keys(IndexScanDesc scan);
 extern void _bt_restore_array_keys(IndexScanDesc scan);
 extern void _bt_preprocess_keys(IndexScanDesc scan);
-extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple,
-						  int tupnatts, ScanDirection dir, bool *continuescan);
+extern void _bt_array_keys_save_scankeys(IndexScanDesc scan,
+										 BTScanInsert inskey);
+extern bool _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, bool final,
+						  BTReadPageState *pstate);
+extern void _bt_checkfinalkeys(IndexScanDesc scan, BTReadPageState *pstate);
+extern bool _bt_nocheckkeys(IndexScanDesc scan, ScanDirection dir);
 extern void _bt_killitems(IndexScanDesc scan);
 extern BTCycleId _bt_vacuum_cycleid(Relation rel);
 extern BTCycleId _bt_start_vacuum(Relation rel);
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 4553aaee5..7ccd5f3f3 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -363,7 +363,9 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
 
 	so->arrayKeyData = NULL;	/* assume no array keys for now */
 	so->numArrayKeys = 0;
+	so->arrayKeysStarted = false;
 	so->arrayKeys = NULL;
+	so->arrayPoskey = NULL;
 	so->arrayContext = NULL;
 
 	so->killedItems = NULL;		/* until needed */
@@ -404,6 +406,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 
 	so->markItemIndex = -1;
 	so->arrayKeyCount = 0;
+	so->arrayKeysStarted = false;
 	BTScanPosUnpinIfPinned(so->markPos);
 	BTScanPosInvalidate(so->markPos);
 
@@ -752,7 +755,23 @@ _bt_parallel_done(IndexScanDesc scan)
  *			keys.
  *
  * Updates the count of array keys processed for both local and parallel
- * scans.
+ * scans. (XXX Really? Then why is "scan->parallel_scan != NULL" used as a
+ * gating condition by our caller?)
+ *
+ * XXX Local advancement of array keys occurs dynamically, and affects the
+ * top-level scan state.  This is at odds with how parallel scans deal with
+ * array key advancement here, so for now we just don't support them at all.
+ *
+ * The issue here is that the leader instructs workers to process array keys
+ * in whatever order is convenient, without concern for repeat or concurrent
+ * accesses to the same physical leaf pages by workers.  This can be addressed
+ * by assigning batches of array keys to workers.  Each individual batch would
+ * match a range from the key space covered by some specific leaf page.  That
+ * whole approach requires dynamic back-and-forth key space partitioning.
+ *
+ * It seems important that parallel index scans match serial index scans in
+ * promising that no single leaf page will be accessed more than once.  That
+ * makes reasoning about the worst case much easier when costing scans.
  */
 void
 _bt_parallel_advance_array_keys(IndexScanDesc scan)
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index 3230b3b89..dcf399acd 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -890,6 +890,18 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 
 	Assert(!BTScanPosIsValid(so->currPos));
 
+	/*
+	 * XXX Queries with SAOPs have always accounted for each call here as one
+	 * "index scan".  This meant that the accounting showed one index scan per
+	 * distinct SAOP constant.  This approach is consistent with how it was
+	 * done before nbtree was taught to handle ScalarArrayOpExpr quals itself
+	 * (it's also how non-amsearcharray index AMs still do it).
+	 *
+	 * Right now, eliding a primitive index scan elides a call here, resulting
+	 * in one less "index scan" recorded by pgstat.  This seems defensible,
+	 * though not necessarily desirable.  Now implementation details can have
+	 * a significant impact on user-visible index scan counts.
+	 */
 	pgstat_count_index_scan(rel);
 
 	/*
@@ -1370,6 +1382,13 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
 	inskey.scantid = NULL;
 	inskey.keysz = keysCount;
 
+	/*
+	 * Save insertion scan key for SK_SEARCHARRAY scans, which need it to
+	 * advance the scan's array keys locally
+	 */
+	if (so->numArrayKeys > 0)
+		_bt_array_keys_save_scankeys(scan, &inskey);
+
 	/*
 	 * Use the manufactured insertion scan key to descend the tree and
 	 * position ourselves on the target leaf page.
@@ -1548,9 +1567,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 	BTPageOpaque opaque;
 	OffsetNumber minoff;
 	OffsetNumber maxoff;
+	BTReadPageState pstate;
 	int			itemIndex;
-	bool		continuescan;
-	int			indnatts;
 
 	/*
 	 * We must have the buffer pinned and locked, but the usual macro can't be
@@ -1570,8 +1588,12 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			_bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
 	}
 
-	continuescan = true;		/* default assumption */
-	indnatts = IndexRelationGetNumberOfAttributes(scan->indexRelation);
+	pstate.dir = dir;
+	pstate.highkey = NULL;
+	pstate.continuescan = true; /* default assumption */
+	pstate.match_for_cur_array_keys = false;
+	pstate.highkeychecked = false;
+
 	minoff = P_FIRSTDATAKEY(opaque);
 	maxoff = PageGetMaxOffsetNumber(page);
 
@@ -1606,6 +1628,14 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 	if (ScanDirectionIsForward(dir))
 	{
+		/* SK_SEARCHARRAY scans must provide high key up front */
+		if (so->numArrayKeys && !P_RIGHTMOST(opaque))
+		{
+			ItemId		iid = PageGetItemId(page, P_HIKEY);
+
+			pstate.highkey = (IndexTuple) PageGetItem(page, iid);
+		}
+
 		/* load items[] in ascending order */
 		itemIndex = 0;
 
@@ -1628,7 +1658,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 			itup = (IndexTuple) PageGetItem(page, iid);
 
-			if (_bt_checkkeys(scan, itup, indnatts, dir, &continuescan))
+			if (_bt_checkkeys(scan, itup, false, &pstate))
 			{
 				/* tuple passes all scan key conditions */
 				if (!BTreeTupleIsPosting(itup))
@@ -1661,7 +1691,7 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 				}
 			}
 			/* When !continuescan, there can't be any more matches, so stop */
-			if (!continuescan)
+			if (!pstate.continuescan)
 				break;
 
 			offnum = OffsetNumberNext(offnum);
@@ -1678,17 +1708,19 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 		 * only appear on non-pivot tuples on the right sibling page are
 		 * common.
 		 */
-		if (continuescan && !P_RIGHTMOST(opaque))
+		if (pstate.continuescan)
 		{
-			ItemId		iid = PageGetItemId(page, P_HIKEY);
-			IndexTuple	itup = (IndexTuple) PageGetItem(page, iid);
-			int			truncatt;
+			if (!P_RIGHTMOST(opaque) && !pstate.highkey)
+			{
+				ItemId		iid = PageGetItemId(page, P_HIKEY);
 
-			truncatt = BTreeTupleGetNAtts(itup, scan->indexRelation);
-			_bt_checkkeys(scan, itup, truncatt, dir, &continuescan);
+				pstate.highkey = (IndexTuple) PageGetItem(page, iid);
+			}
+
+			_bt_checkfinalkeys(scan, &pstate);
 		}
 
-		if (!continuescan)
+		if (!pstate.continuescan)
 			so->currPos.moreRight = false;
 
 		Assert(itemIndex <= MaxTIDsPerBTreePage);
@@ -1722,8 +1754,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 			 */
 			if (scan->ignore_killed_tuples && ItemIdIsDead(iid))
 			{
-				Assert(offnum >= P_FIRSTDATAKEY(opaque));
-				if (offnum > P_FIRSTDATAKEY(opaque))
+				Assert(offnum >= minoff);
+				if (offnum > minoff)
 				{
 					offnum = OffsetNumberPrev(offnum);
 					continue;
@@ -1736,8 +1768,8 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 
 			itup = (IndexTuple) PageGetItem(page, iid);
 
-			passes_quals = _bt_checkkeys(scan, itup, indnatts, dir,
-										 &continuescan);
+			passes_quals = _bt_checkkeys(scan, itup, offnum == minoff,
+										 &pstate);
 			if (passes_quals && tuple_alive)
 			{
 				/* tuple passes all scan key conditions */
@@ -1776,16 +1808,25 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
 					}
 				}
 			}
-			if (!continuescan)
-			{
-				/* there can't be any more matches, so stop */
-				so->currPos.moreLeft = false;
+			/* When !continuescan, there can't be any more matches, so stop */
+			if (!pstate.continuescan)
 				break;
-			}
 
 			offnum = OffsetNumberPrev(offnum);
 		}
 
+		/*
+		 * Backward scans never check the high key, but must still call
+		 * _bt_nocheckkeys when they reach the last page (the leftmost page)
+		 * without any tuple ever setting continuescan to false.
+		 */
+		if (pstate.continuescan && P_LEFTMOST(opaque) &&
+			_bt_nocheckkeys(scan, dir))
+			pstate.continuescan = false;
+
+		if (!pstate.continuescan)
+			so->currPos.moreLeft = false;
+
 		Assert(itemIndex >= 0);
 		so->currPos.firstItem = itemIndex;
 		so->currPos.lastItem = MaxTIDsPerBTreePage - 1;
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 7da499c4d..af8accbd3 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -45,11 +45,19 @@ static int	_bt_sort_array_elements(IndexScanDesc scan, ScanKey skey,
 									bool reverse,
 									Datum *elems, int nelems);
 static int	_bt_compare_array_elements(const void *a, const void *b, void *arg);
+static bool _bt_advance_array_keys_locally(IndexScanDesc scan,
+										   IndexTuple tuple, bool final,
+										   BTReadPageState *pstate);
+static bool _bt_tuple_advances_keys(IndexScanDesc scan, IndexTuple tuple,
+									ScanDirection dir);
 static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op,
 									 ScanKey leftarg, ScanKey rightarg,
 									 bool *result);
 static bool _bt_fix_scankey_strategy(ScanKey skey, int16 *indoption);
 static void _bt_mark_scankey_required(ScanKey skey);
+static bool _bt_check_compare(ScanKey keyData, int keysz,
+							  IndexTuple tuple, int tupnatts, TupleDesc tupdesc,
+							  ScanDirection dir, bool *continuescan);
 static bool _bt_check_rowcompare(ScanKey skey,
 								 IndexTuple tuple, int tupnatts, TupleDesc tupdesc,
 								 ScanDirection dir, bool *continuescan);
@@ -202,6 +210,29 @@ _bt_freestack(BTStack stack)
  * array keys, it's sufficient to find the extreme element value and replace
  * the whole array with that scalar value.
  *
+ * It's important that we consistently avoid leaving behind SK_SEARCHARRAY
+ * inequalities after preprocessing, since _bt_advance_array_keys_locally
+ * expects to be able to treat SK_SEARCHARRAY keys as equality constraints.
+ * This makes it possible for the scan to take advantage of naturally occuring
+ * locality to avoid continually redescending the index in _bt_first.  We can
+ * advance the array keys opportunistically inside _bt_check_array_keys.  This
+ * won't affect the externally visible behavior of the scan.
+ *
+ * In the worst case, the number of primitive index scans will equal the
+ * number of array elements (or the product of the number of array keys when
+ * there are multiple arrays/columns involved).  It's also possible that the
+ * total number of primitive index scans will be far less than that.
+ *
+ * We always sort and deduplicate arrays up-front for equality array keys.
+ * ScalarArrayOpExpr execution need only visit leaf pages that might contain
+ * matches exactly once, while preserving the sort order of the index.  This
+ * isn't just about performance; it also avoids needing duplicate elimination
+ * of matching TIDs (we prefer deduplicating search keys once, up-front).
+ * Equality SK_SEARCHARRAY keys are disjuncts that we always process in
+ * index/key space order, which makes this general approach feasible.  Every
+ * index tuple will match no more than one single distinct combination of
+ * equality-constrained keys (array keys and other equality keys).
+ *
  * Note: the reason we need so->arrayKeyData, rather than just scribbling
  * on scan->keyData, is that callers are permitted to call btrescan without
  * supplying a new set of scankey data.
@@ -539,6 +570,9 @@ _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
 			curArrayKey->cur_elem = 0;
 		skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem];
 	}
+
+	/* Tell _bt_advance_array_keys to advance array keys when called */
+	so->arrayKeysStarted = true;
 }
 
 /*
@@ -546,6 +580,10 @@ _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir)
  *
  * Returns true if there is another set of values to consider, false if not.
  * On true result, the scankeys are initialized with the next set of values.
+ *
+ * On false result, local advancement of the array keys has reached the end of
+ * each of the arrays for the current scan direction.  Only our btgettuple and
+ * btgetbitmap callers should rely on this.
  */
 bool
 _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
@@ -554,6 +592,9 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
 	bool		found = false;
 	int			i;
 
+	if (!so->arrayKeysStarted)
+		return false;
+
 	/*
 	 * We must advance the last array key most quickly, since it will
 	 * correspond to the lowest-order index column among the available
@@ -594,6 +635,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
 			break;
 	}
 
+	/* Scan reached the end of its array keys in the current scan direction */
+	if (!found)
+		so->arrayKeysStarted = false;
+
 	/* advance parallel scan */
 	if (scan->parallel_scan != NULL)
 		_bt_parallel_advance_array_keys(scan);
@@ -601,6 +646,391 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
 	return found;
 }
 
+/*
+ * Check if we need to advance SK_SEARCHARRAY array keys when _bt_checkkeys
+ * returns false and sets continuescan=false.  It's possible that the tuple
+ * will be a match after we advance the array keys.
+ *
+ * It is often possible for SK_SEARCHARRAY scans to skip one or more primitive
+ * index scans.  Starting a new primitive scan is only required when it is
+ * truly necessary to reposition the top-level scan to some distant leaf page
+ * (where the start of the key space for the next set of search keys begins).
+ * This process (redescending the index) is implemented by calling _bt_first
+ * after the array keys are "globally advanced" by the top-level index scan.
+ *
+ * Starting a new primitive index scan is avoided whenever the end of matches
+ * for the current set of array keys happens to be physically close to the
+ * start of matches for the next set of array keys.  The technique isn't used
+ * when matches for the next set of array keys aren't found on the same leaf
+ * page (unless there is good reason to believe that a visit to the next leaf
+ * page needs to take place).
+ *
+ * In the worst case the top-level index scan performs one primitive index
+ * scan per distinct set of array/search keys.  In the best case we require
+ * only a single primitive index scan for the entire top-level index scan
+ * (this is even possible with arbitrarily-many distinct sets of array keys).
+ * The optimization is particularly effective with queries that have several
+ * SK_SEARCHARRAY keys (one per index column) when scanning a composite index.
+ * Most individual search key combinations (which are simple conjunctions) may
+ * well turn out to have no matching index tuples.
+ *
+ * Returns false when array keys have not or cannot advance.  A new primitive
+ * index scan will be required -- except when the top-level, btrescan-wise
+ * index scan has processed all array keys in the current scan direction.
+ *
+ * Returns true when array keys were advanced "locally".  Caller must recheck
+ * the tuple that initially set continuescan=false against the new array keys.
+ * At this point the newly advanced array keys are provisional.  The "current"
+ * keys only get "locked in" to the ongoing primitive scan when _bt_checkkeys
+ * returns its first match for the keys.  This must happen almost immediately;
+ * we should only invest in eliding primitive index scans when we're almost
+ * certain that it'll work out.
+ *
+ * Note: The fact that we only advance array keys "provisionally" imposes a
+ * requirement on _bt_readpage: it must call _bt_checkfinalkeys whenever its
+ * scan of a leaf page wasn't terminated when it called _bt_checkkeys against
+ * non-pivot tuples.  This scheme ensures that we'll always have at least one
+ * opportunity to change our minds per leaf page scanned (even, say, on a page
+ * that only contains non-pivot tuples whose LP_DEAD bits are set).
+ *
+ * Note: We can determine that the next leaf page ought to be handled by the
+ * ongoing primitive index scan without being fully sure that it'll work out.
+ * This occasionally results in primitive index scans that waste cycles on a
+ * useless visit to an extra page, which then terminates the primitive scan.
+ * Such wasted accesses are only possible when the high key (or the final key
+ * in the case of backwards scans) is within the bounds of the latest set of
+ * array keys that the primitive scan can advance to.
+ *
+ * Note: There are cases where we visit the next leaf page during a primitive
+ * index scan without being completely certain about whether or not we really
+ * need to visit that page at all.  In other words, sometimes we speculatively
+ * visit the next leaf page, which risks wasting a leaf page access.
+ */
+static bool
+_bt_advance_array_keys_locally(IndexScanDesc scan, IndexTuple tuple,
+							   bool final, BTReadPageState *pstate)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+	Assert(!pstate->continuescan);
+	Assert(so->arrayKeysStarted);
+
+	if (!so->arrayPoskey)
+	{
+		/*
+		 * Scans that lack an initial positioning key (and so must go through
+		 * _bt_endpoint rather than calling _bt_search from _bt_first) are not
+		 * capable of locally advancing array keys
+		 */
+		return false;
+	}
+
+	/*
+	 * Current search type scan keys (including current array keys) indicated
+	 * that this tuple terminates the scan in _bt_checkkeys caller.  Can this
+	 * tuple be a match for later sets of array keys, once advanced?
+	 */
+	if (!_bt_tuple_advances_keys(scan, tuple, pstate->dir))
+	{
+		/*
+		 * Tuple definitely isn't a match for any set of search keys.  Tuple
+		 * definitely won't be returned by _bt_checkkeys.  Now we need to
+		 * determine if the scan will continue to the next tuple/page.
+		 *
+		 * If this is a forwards scan, check the high key -- page state
+		 * stashes it in order to allow us to terminate processing of a page
+		 * (and the primitive index scan as a whole) early.
+		 *
+		 * If this is a backwards scan, treat the first non-pivot tuple as a
+		 * stand-in for the page high key.  Unlike the forward scan case, this
+		 * is only possible when _bt_checkkeys reaches the final tuple on the
+		 * page.  (Only the more common forward scan case has the ability to
+		 * end the scan of an individual page early using the high key because
+		 * we always have the high key stashed.)
+		 *
+		 * This always needs to happen before we leave each leaf page, for all
+		 * sets of array keys up to and including the last set we advance to.
+		 * We must avoid becoming confused about which primitive index scan
+		 * (the current or the next) returns matches for any set of array
+		 * keys.
+		 */
+		if (!pstate->match_for_cur_array_keys &&
+			(final || (!pstate->highkeychecked && pstate->highkey)))
+		{
+			Assert(ScanDirectionIsForward(pstate->dir) || !pstate->highkey);
+			Assert(ScanDirectionIsBackward(pstate->dir) || !final);
+
+			pstate->highkeychecked = true;	/* iff this is a forward scan */
+
+			if (final || !_bt_tuple_advances_keys(scan, pstate->highkey,
+												  pstate->dir))
+			{
+				/*
+				 * We're unlikely to find any further matches for the current
+				 * set of array keys on the next sibling leaf page.
+				 *
+				 * Back up the array keys so that btgettuple or btgetbitmap
+				 * won't advance the keys past the now-current set.  This is
+				 * safe because we haven't returned any tuples matching this
+				 * set of keys.
+				 */
+				ScanDirection flipdir = -pstate->dir;
+
+				if (!_bt_advance_array_keys(scan, flipdir))
+					Assert(false);
+
+				_bt_preprocess_keys(scan);
+
+				/* End the current primitive index scan */
+				pstate->continuescan = false;	/* redundant */
+				return false;
+			}
+		}
+
+		/*
+		 * Continue the current primitive index scan.  Returning false
+		 * indicates that we're done with this tuple.  The ongoing primitive
+		 * index scan will proceed to the next non-pivot tuple on this page
+		 * (or to the first non-pivot tuple on the next page).
+		 */
+		pstate->continuescan = true;
+		return false;
+	}
+
+	if (!_bt_advance_array_keys(scan, pstate->dir))
+	{
+		Assert(!so->arrayKeysStarted);
+
+		/*
+		 * Ran out of array keys to advance the scan to.  The top-level,
+		 * btrescan-wise scan has been terminated by this tuple.
+		 */
+		pstate->continuescan = false;	/* redundant */
+		return false;
+	}
+
+	/*
+	 * Successfully advanced the array keys.  We'll now need to see what
+	 * _bt_checkkeys loop says about the same tuple with this new set of keys.
+	 *
+	 * Advancing the arrays keys is only provisional at this point.  If there
+	 * are no matches for the new array keys before we leave the page, and
+	 * high key check indicates that there is little chance of finding any
+	 * matches for the new keys on the next page, we will change our mind.
+	 * This is handled by "backing up" the array keys, and then starting a new
+	 * primitive index scan for the same set of array keys.
+	 *
+	 * XXX Clearly it would be a lot more efficient if we were to implement
+	 * all this by searching for the next set of array keys using this tuple's
+	 * key values, directly.  Right now we effectively use a linear search
+	 * (though one that can terminate upon finding the first match).  We must
+	 * make it into a binary search to get acceptable performance.
+	 *
+	 * Our current naive approach works well enough for prototyping purposes,
+	 * but chokes in extreme cases where the Cartesian product of all SAOP
+	 * arrays (i.e. the total number of DNF single value predicates generated
+	 * by the _bt_advance_array_keys state machine) starts to get unwieldy.
+	 * We're holding a buffer lock here, so this isn't really negotiable.
+	 *
+	 * It's not particular unlikely that the total number of DNF predicates
+	 * exceeds the number of tuples that'll be returned by the ongoing scan.
+	 * Efficiently advancing the array keys might turn out to matter almost as
+	 * much as efficiently searching for the next matching index tuple.
+	 */
+	_bt_preprocess_keys(scan);
+
+	if (pstate->highkey)
+	{
+		/* High key precheck might need to be repeated for new array keys */
+		pstate->match_for_cur_array_keys = false;
+		pstate->highkeychecked = false;
+	}
+
+	/*
+	 * Note: It doesn't matter how continuescan is set by us at this point.
+	 * The next iteration of caller's loop will overwrite continuescan.
+	 */
+	return true;
+}
+
+/*
+ * Helper routine used by _bt_advance_array_keys_locally.
+ *
+ * We're called with tuples that _bt_checkkeys set continuescan to false for.
+ * We distinguish between search-type scan keys that have equality constraints
+ * on an index column (which are always marked as required in both directions)
+ * and other search-type scan keys that are required in one direction only.
+ * The distinction is important independent of the current scan direction,
+ * since caller should only advance array keys when an equality constraint
+ * indicated the end of the current set of array keys.  (Note also that
+ * non-equality "required in one direction only" scan keys can only end the
+ * entire btrescan-wise scan when we run out of array keys to process for the
+ * current scan direction).
+ *
+ * We help our caller identify where matches for the next set of array keys
+ * _might_ begin when it turns out that we can elide another descent of the
+ * index for the next set of array keys.  There will be a gap of 0 or more
+ * non-matching index tuples between the last tuple that satisfies the current
+ * set of scan keys (including its array keys), and the first tuple that might
+ * satisfy the next set (caller won't know for sure until after it advances
+ * the current set of array keys).  This gap might be negligible, or it might
+ * be a significant fraction of all non-pivot tuples on the leaf level.
+ *
+ * The qual "WHERE x IN (3,4,5) AND y < 42" will have its 'y' scan key marked
+ * SK_BT_REQFWD (not SK_BT_REQBKWD) -- 'y' isn't an equality constraint.
+ * _bt_checkkeys will set continuescan=false as soon as the scan reaches a
+ * tuple matching (3, 42) or a tuple matching (4, 1).  Eliding the next
+ * primitive index scan (by advancing the array keys locally) happens when the
+ * gap is confined to a single leaf page.  Caller continues its scan through
+ * these gap tuples, and calls back here to check if it has found the point
+ * that it might be necessary to advance its array keys.
+ *
+ * Returns false when caller's tuple definitely isn't where the next group of
+ * matching tuples begins.  Caller can either continue the process with the
+ * very next tuple from its leaf page, or give up completely.  Giving up means
+ * that caller accepts that there must be another _bt_first descent (in the
+ * likely event of another call to btgettuple/btgetbitmap from the executor).
+ *
+ * Returns true when caller passed a tuple that might be a match for the next
+ * set of array keys.  That is, when tuple is > the current set of array keys
+ * and other equality constraints for a forward scan (or < for a backwards
+ * scans).  Caller must attempt to advance the array keys when this happens.
+ *
+ * Note: Our test is based on the current equality constraint scan keys rather
+ * than the next set in line because it's not yet clear if the next set in
+ * line will find any matches whatsoever.  Once caller is positioned at the
+ * first tuple that might satisfy the next set of array keys, it could be
+ * necessary for it to advance its array keys more than once.
+ */
+static bool
+_bt_tuple_advances_keys(IndexScanDesc scan, IndexTuple tuple, ScanDirection dir)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	Relation	rel = scan->indexRelation;
+	TupleDesc	itupdesc = RelationGetDescr(rel);
+	bool		tuple_ahead = true;
+	int			ncmpkey;
+
+	Assert(so->qual_ok);
+	Assert(so->numArrayKeys > 0);
+	Assert(so->numberOfKeys > 0);
+	Assert(so->arrayPoskey->keysz > 0);
+
+	ncmpkey = Min(BTreeTupleGetNAtts(tuple, rel), so->numberOfKeys);
+	for (int attnum = 1; attnum <= ncmpkey; attnum++)
+	{
+		ScanKey		cur = &so->keyData[attnum - 1];
+		ScanKey		iscankey;
+		Datum		datum;
+		bool		isNull;
+		int32		result;
+
+		if ((ScanDirectionIsForward(dir) &&
+			 (cur->sk_flags & SK_BT_REQFWD) == 0) ||
+			(ScanDirectionIsBackward(dir) &&
+			 (cur->sk_flags & SK_BT_REQBKWD) == 0))
+		{
+			/*
+			 * This scan key is not marked as required for the current
+			 * direction, so there are no further attributes to consider. This
+			 * tuple definitely isn't at the start of the next group of
+			 * matching tuples.
+			 */
+			break;
+		}
+
+		Assert(cur->sk_attno == attnum);
+		if (cur->sk_attno > so->arrayPoskey->keysz)
+		{
+			/*
+			 * There is no equality constraint on this column/scan key to
+			 * break the tie.  This tuple definitely isn't at the start of the
+			 * next group of matching tuples.
+			 */
+			Assert(cur->sk_strategy != BTEqualStrategyNumber);
+			Assert((cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) !=
+				   (SK_BT_REQFWD | SK_BT_REQBKWD));
+			break;
+		}
+
+		/*
+		 * This column has an equality constraint/insertion scan key entry
+		 */
+		Assert((cur->sk_flags & (SK_BT_REQFWD | SK_BT_REQBKWD)) ==
+			   (SK_BT_REQFWD | SK_BT_REQBKWD));
+		Assert(cur->sk_strategy == BTEqualStrategyNumber);
+
+		/*
+		 * Row comparison scan keys may be present after (though never before)
+		 * columns that we recognized as having equality constraints.
+		 *
+		 * A qual like "WHERE a in (1, 2, 3) AND (b, c) >= (500, 7)" is safe,
+		 * whereas "WHERE (a, b) >= (1, 500) AND c in (7, 8, 9)" is unsafe.
+		 * Assert that this isn't one of the unsafe cases in passing.
+		 */
+		Assert((cur->sk_flags & SK_ROW_HEADER) == 0);
+
+		/*
+		 * We'll need to use this attribute's 3-way comparison order proc
+		 * (btree opclass support function 1) from its insertion-type scan key
+		 */
+		iscankey = &so->arrayPoskey->scankeys[attnum - 1];
+		Assert(iscankey->sk_flags == cur->sk_flags);
+		Assert(iscankey->sk_attno == cur->sk_attno);
+		Assert(iscankey->sk_subtype == cur->sk_subtype);
+		Assert(iscankey->sk_collation == cur->sk_collation);
+
+		/*
+		 * The 3-way comparison order proc will be called using the
+		 * search-type scan key's current sk_argument
+		 */
+		datum = index_getattr(tuple, attnum, itupdesc, &isNull);
+		if (iscankey->sk_flags & SK_ISNULL) /* key is NULL */
+		{
+			if (isNull)
+				result = 0;		/* NULL "=" NULL */
+			else if (iscankey->sk_flags & SK_BT_NULLS_FIRST)
+				result = -1;	/* NULL "<" NOT_NULL */
+			else
+				result = 1;		/* NULL ">" NOT_NULL */
+		}
+		else if (isNull)		/* key is NOT_NULL and item is NULL */
+		{
+			if (iscankey->sk_flags & SK_BT_NULLS_FIRST)
+				result = 1;		/* NOT_NULL ">" NULL */
+			else
+				result = -1;	/* NOT_NULL "<" NULL */
+		}
+		else
+		{
+			/*
+			 * The sk_func needs to be passed the index value as left arg and
+			 * the sk_argument as right arg (they might be of different
+			 * types).  We want to keep this consistent with what _bt_compare
+			 * does, so we flip the sign of the comparison result.  (Unless
+			 * it's a DESC column, in which case we *don't* flip the sign.)
+			 */
+			result = DatumGetInt32(FunctionCall2Coll(&iscankey->sk_func,
+													 cur->sk_collation, datum,
+													 cur->sk_argument));
+			if (!(iscankey->sk_flags & SK_BT_DESC))
+				INVERT_COMPARE_RESULT(result);
+		}
+
+		if (result != 0)
+		{
+			if (ScanDirectionIsForward(dir))
+				tuple_ahead = result < 0;
+			else
+				tuple_ahead = result > 0;
+
+			break;
+		}
+	}
+
+	return tuple_ahead;
+}
+
 /*
  * _bt_mark_array_keys() -- Handle array keys during btmarkpos
  *
@@ -744,6 +1174,12 @@ _bt_restore_array_keys(IndexScanDesc scan)
  * storage is that we are modifying the array based on comparisons of the
  * key argument values, which could change on a rescan or after moving to
  * new elements of array keys.  Therefore we can't overwrite the source data.
+ *
+ * TODO Replace all calls to this function added by the patch with calls to
+ * some other more specialized function with reduced surface area -- something
+ * that is explicitly safe to call while holding a buffer lock.  That's been
+ * put off for now because the code in this function is likely to need to be
+ * better integrated with the planner before long anyway.
  */
 void
 _bt_preprocess_keys(IndexScanDesc scan)
@@ -1012,6 +1448,45 @@ _bt_preprocess_keys(IndexScanDesc scan)
 	so->numberOfKeys = new_numberOfKeys;
 }
 
+/*
+ * Save insertion scankey for searches with a SK_SEARCHARRAY scan key.
+ *
+ * We must save the initial positioning insertion scan key for SK_SEARCHARRAY
+ * scans (barring those that only have SK_SEARCHARRAY inequalities).  Each
+ * insertion scan key entry/column will have a corresponding "=" operator in
+ * caller's search-type scan key, but that's no substitute for the 3-way
+ * comparison function.
+ *
+ * _bt_tuple_advances_keys needs to perform 3-way comparisons to figure out if
+ * an ongoing scan can elide another descent of the index in _bt_first.  It
+ * works by locating the end of the _current_ set of equality constraint type
+ * scan keys -- not by locating the start of the next set.  This is not unlike
+ * the approach taken by _bt_search with a nextkey=true search.
+ */
+void
+_bt_array_keys_save_scankeys(IndexScanDesc scan, BTScanInsert inskey)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	Size		sksize;
+
+	Assert(inskey->keysz > 0);
+	Assert(so->numArrayKeys > 0);
+	Assert(so->qual_ok);
+	Assert(!BTScanPosIsValid(so->currPos));
+
+	if (so->arrayPoskey)
+	{
+		/* Reuse the insertion scan key from the last primitive index scan */
+		Assert(so->arrayPoskey->keysz == inskey->keysz);
+		return;
+	}
+
+	sksize = offsetof(BTScanInsertData, scankeys) +
+		sizeof(ScanKeyData) * inskey->keysz;
+	so->arrayPoskey = palloc(sksize);
+	memcpy(so->arrayPoskey, inskey, sksize);
+}
+
 /*
  * Compare two scankey values using a specified operator.
  *
@@ -1348,35 +1823,68 @@ _bt_mark_scankey_required(ScanKey skey)
  * this tuple, and set *continuescan accordingly.  See comments for
  * _bt_preprocess_keys(), above, about how this is done.
  *
- * Forward scan callers can pass a high key tuple in the hopes of having
- * us set *continuescan to false, and avoiding an unnecessary visit to
- * the page to the right.
+ * Advances the current set of array keys locally for SK_SEARCHARRAY scans
+ * where appropriate.  These callers are required to initialize the page level
+ * high key in pstate.
  *
  * scan: index scan descriptor (containing a search-type scankey)
  * tuple: index tuple to test
- * tupnatts: number of attributes in tupnatts (high key may be truncated)
- * dir: direction we are scanning in
- * continuescan: output parameter (will be set correctly in all cases)
+ * final: final tuple/call for this page, from a backwards scan?
+ * pstate: Page level input and output parameters
  */
 bool
-_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
-			  ScanDirection dir, bool *continuescan)
+_bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, bool final,
+			  BTReadPageState *pstate)
+{
+	TupleDesc	tupdesc = RelationGetDescr(scan->indexRelation);
+	int			natts = BTreeTupleGetNAtts(tuple, scan->indexRelation);
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+	bool		res;
+
+	/* This loop handles advancing to the next array elements, if any */
+	do
+	{
+		res = _bt_check_compare(so->keyData, so->numberOfKeys,
+								tuple, natts, tupdesc,
+								pstate->dir, &pstate->continuescan);
+
+		/* If we have a tuple, return it ... */
+		if (res)
+		{
+			pstate->match_for_cur_array_keys = true;
+
+			Assert(!so->numArrayKeys || !so->arrayPoskey ||
+				   _bt_tuple_advances_keys(scan, tuple, pstate->dir));
+			break;
+		}
+
+		/* ... otherwise see if we have more array keys to deal with */
+	} while (so->numArrayKeys && !pstate->continuescan &&
+			 _bt_advance_array_keys_locally(scan, tuple, final, pstate));
+
+	return res;
+}
+
+/*
+ * Test whether an indextuple satisfies current scan condition.
+ *
+ * Return true if so, false if not.  If not, also clear *continuescan if
+ * it's not possible for any future tuples in the current scan direction to
+ * pass the qual with the current set of array keys.
+ *
+ * This is a subroutine for _bt_checkkeys.
+ */
+static bool
+_bt_check_compare(ScanKey keyData, int keysz,
+				  IndexTuple tuple, int tupnatts, TupleDesc tupdesc,
+				  ScanDirection dir, bool *continuescan)
 {
-	TupleDesc	tupdesc;
-	BTScanOpaque so;
-	int			keysz;
 	int			ikey;
 	ScanKey		key;
 
-	Assert(BTreeTupleGetNAtts(tuple, scan->indexRelation) == tupnatts);
-
 	*continuescan = true;		/* default assumption */
 
-	tupdesc = RelationGetDescr(scan->indexRelation);
-	so = (BTScanOpaque) scan->opaque;
-	keysz = so->numberOfKeys;
-
-	for (key = so->keyData, ikey = 0; ikey < keysz; key++, ikey++)
+	for (key = keyData, ikey = 0; ikey < keysz; key++, ikey++)
 	{
 		Datum		datum;
 		bool		isNull;
@@ -1523,7 +2031,7 @@ _bt_checkkeys(IndexScanDesc scan, IndexTuple tuple, int tupnatts,
  * it's not possible for any future tuples in the current scan direction
  * to pass the qual.
  *
- * This is a subroutine for _bt_checkkeys, which see for more info.
+ * This is a subroutine for _bt_check_compare/_bt_checkkeys_compare.
  */
 static bool
 _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, int tupnatts,
@@ -1690,6 +2198,49 @@ _bt_check_rowcompare(ScanKey skey, IndexTuple tuple, int tupnatts,
 	return result;
 }
 
+void
+_bt_checkfinalkeys(IndexScanDesc scan, BTReadPageState *pstate)
+{
+	IndexTuple	highkey = pstate->highkey;
+
+	Assert(pstate->continuescan);
+
+	if (!pstate->highkey)
+	{
+		_bt_nocheckkeys(scan, pstate->dir);
+		pstate->continuescan = false;
+		return;
+	}
+
+	pstate->highkey = NULL;
+	_bt_checkkeys(scan, highkey, false, pstate);
+}
+
+/*
+ * Perform final steps when the "end point" is reached on the leaf level
+ * without any call to _bt_checkkeys setting *continuescan to false.
+ *
+ * Called on the rightmost page in the forward scan case, and the leftmost
+ * page in the backwards scan case.  Only call here when _bt_checkkeys hasn't
+ * already set continuescan to false.
+ */
+bool
+_bt_nocheckkeys(IndexScanDesc scan, ScanDirection dir)
+{
+	BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+	/* Only need to do real work in SK_SEARCHARRAY case, for now */
+	if (!so->numArrayKeys)
+		return false;
+
+	Assert(so->arrayKeysStarted);
+
+	while (_bt_advance_array_keys(scan, dir))
+		_bt_preprocess_keys(scan);
+
+	return true;
+}
+
 /*
  * _bt_killitems - set LP_DEAD state for items an indexscan caller has
  * told us were killed
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index 6a93d767a..73064758d 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -32,6 +32,7 @@
 #include "optimizer/paths.h"
 #include "optimizer/prep.h"
 #include "optimizer/restrictinfo.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/selfuncs.h"
 
@@ -107,7 +108,7 @@ static List *build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 							   bool useful_predicate,
 							   ScanTypeControl scantype,
 							   bool *skip_nonnative_saop,
-							   bool *skip_lower_saop);
+							   bool *skip_unordered_saop);
 static List *build_paths_for_OR(PlannerInfo *root, RelOptInfo *rel,
 								List *clauses, List *other_clauses);
 static List *generate_bitmap_or_paths(PlannerInfo *root, RelOptInfo *rel,
@@ -706,8 +707,8 @@ eclass_already_used(EquivalenceClass *parent_ec, Relids oldrelids,
  * index AM supports them natively, we should just include them in simple
  * index paths.  If not, we should exclude them while building simple index
  * paths, and then make a separate attempt to include them in bitmap paths.
- * Furthermore, we should consider excluding lower-order ScalarArrayOpExpr
- * quals so as to create ordered paths.
+ * Furthermore, we should consider excluding ScalarArrayOpExpr quals whose
+ * inclusion would force the path as a whole to be unordered.
  */
 static void
 get_index_paths(PlannerInfo *root, RelOptInfo *rel,
@@ -716,28 +717,28 @@ get_index_paths(PlannerInfo *root, RelOptInfo *rel,
 {
 	List	   *indexpaths;
 	bool		skip_nonnative_saop = false;
-	bool		skip_lower_saop = false;
+	bool		skip_unordered_saop = false;
 	ListCell   *lc;
 
 	/*
 	 * Build simple index paths using the clauses.  Allow ScalarArrayOpExpr
 	 * clauses only if the index AM supports them natively, and skip any such
-	 * clauses for index columns after the first (so that we produce ordered
-	 * paths if possible).
+	 * clauses for index columns whose inclusion would make it impossible to
+	 * produce ordered paths.
 	 */
 	indexpaths = build_index_paths(root, rel,
 								   index, clauses,
 								   index->predOK,
 								   ST_ANYSCAN,
 								   &skip_nonnative_saop,
-								   &skip_lower_saop);
+								   &skip_unordered_saop);
 
 	/*
-	 * If we skipped any lower-order ScalarArrayOpExprs on an index with an AM
-	 * that supports them, then try again including those clauses.  This will
-	 * produce paths with more selectivity but no ordering.
+	 * If we skipped any ScalarArrayOpExprs without ordered paths on an index
+	 * with an AM that supports them, then try again including those clauses.
+	 * This will produce paths with more selectivity.
 	 */
-	if (skip_lower_saop)
+	if (skip_unordered_saop)
 	{
 		indexpaths = list_concat(indexpaths,
 								 build_index_paths(root, rel,
@@ -817,11 +818,9 @@ get_index_paths(PlannerInfo *root, RelOptInfo *rel,
  * to true if we found any such clauses (caller must initialize the variable
  * to false).  If it's NULL, we do not ignore ScalarArrayOpExpr clauses.
  *
- * If skip_lower_saop is non-NULL, we ignore ScalarArrayOpExpr clauses for
- * non-first index columns, and we set *skip_lower_saop to true if we found
- * any such clauses (caller must initialize the variable to false).  If it's
- * NULL, we do not ignore non-first ScalarArrayOpExpr clauses, but they will
- * result in considering the scan's output to be unordered.
+ * If skip_unordered_saop is non-NULL, we ignore ScalarArrayOpExpr clauses
+ * whose inclusion forces us to treat the scan's output as unordered.  If it's
+ * NULL then we allow it, in order to produce paths with greater selectivity.
  *
  * 'rel' is the index's heap relation
  * 'index' is the index for which we want to generate paths
@@ -829,7 +828,7 @@ get_index_paths(PlannerInfo *root, RelOptInfo *rel,
  * 'useful_predicate' indicates whether the index has a useful predicate
  * 'scantype' indicates whether we need plain or bitmap scan support
  * 'skip_nonnative_saop' indicates whether to accept SAOP if index AM doesn't
- * 'skip_lower_saop' indicates whether to accept non-first-column SAOP
+ * 'skip_unordered_saop' indicates whether to accept unordered SOAPs
  */
 static List *
 build_index_paths(PlannerInfo *root, RelOptInfo *rel,
@@ -837,7 +836,7 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 				  bool useful_predicate,
 				  ScanTypeControl scantype,
 				  bool *skip_nonnative_saop,
-				  bool *skip_lower_saop)
+				  bool *skip_unordered_saop)
 {
 	List	   *result = NIL;
 	IndexPath  *ipath;
@@ -848,10 +847,13 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 	List	   *orderbyclausecols;
 	List	   *index_pathkeys;
 	List	   *useful_pathkeys;
-	bool		found_lower_saop_clause;
+	bool		row_compare_seen_already;
+	bool		saop_included_already;
+	bool		saop_invalidates_ordering;
 	bool		pathkeys_possibly_useful;
 	bool		index_is_ordered;
 	bool		index_only_scan;
+	int			prev_equality_indexcol;
 	int			indexcol;
 
 	/*
@@ -880,25 +882,27 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 	 * on by btree and possibly other places.)  The list can be empty, if the
 	 * index AM allows that.
 	 *
-	 * found_lower_saop_clause is set true if we accept a ScalarArrayOpExpr
-	 * index clause for a non-first index column.  This prevents us from
-	 * assuming that the scan result is ordered.  (Actually, the result is
-	 * still ordered if there are equality constraints for all earlier
-	 * columns, but it seems too expensive and non-modular for this code to be
-	 * aware of that refinement.)
+	 * saop_invalidates_ordering is set true if we accept a ScalarArrayOpExpr
+	 * index clause that invalidates the sort order.  In practice this is
+	 * always due to the presence of a non-first index column.  This prevents
+	 * us from assuming that the scan result is ordered.
 	 *
 	 * We also build a Relids set showing which outer rels are required by the
 	 * selected clauses.  Any lateral_relids are included in that, but not
 	 * otherwise accounted for.
 	 */
 	index_clauses = NIL;
-	found_lower_saop_clause = false;
+	prev_equality_indexcol = -1;
+	row_compare_seen_already = false;
+	saop_included_already = false;
+	saop_invalidates_ordering = false;
 	outer_relids = bms_copy(rel->lateral_relids);
 	for (indexcol = 0; indexcol < index->nkeycolumns; indexcol++)
 	{
+		List	   *colclauses = clauses->indexclauses[indexcol];
 		ListCell   *lc;
 
-		foreach(lc, clauses->indexclauses[indexcol])
+		foreach(lc, colclauses)
 		{
 			IndexClause *iclause = (IndexClause *) lfirst(lc);
 			RestrictInfo *rinfo = iclause->rinfo;
@@ -906,6 +910,8 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 			/* We might need to omit ScalarArrayOpExpr clauses */
 			if (IsA(rinfo->clause, ScalarArrayOpExpr))
 			{
+				ScalarArrayOpExpr *saop = (ScalarArrayOpExpr *) rinfo->clause;
+
 				if (!index->amsearcharray)
 				{
 					if (skip_nonnative_saop)
@@ -916,18 +922,152 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 					}
 					/* Caller had better intend this only for bitmap scan */
 					Assert(scantype == ST_BITMAPSCAN);
+					saop_invalidates_ordering = true;	/* defensive */
+					goto include_clause;
 				}
-				if (indexcol > 0)
+
+				/*
+				 * Index AM that handles ScalarArrayOpExpr quals natively.
+				 *
+				 * We assume that it's always better to apply a clause as an
+				 * indexqual than as a filter (qpqual); which is where an
+				 * available clause would end up being applied if we omit it
+				 * from the indexquals.
+				 *
+				 * XXX Currently, nbtree just assumes that all SK_SEARCHARRAY
+				 * search-type scankeys will be marked as required, with the
+				 * exception of the first attribute without an "=" key (any
+				 * such attribute is marked SK_BT_REQFWD or SK_BT_REQBKWD, but
+				 * it won't be in the initial positioning insertion scan key,
+				 * so _bt_array_continuescan() won't consider it).
+				 */
+				if (row_compare_seen_already)
 				{
-					if (skip_lower_saop)
+					/*
+					 * Cannot safely include a ScalarArrayOpExpr after a
+					 * higher-order RowCompareExpr (barring the "=" case).
+					 */
+					Assert(indexcol > 0);
+					continue;
+				}
+
+				/*
+				 * Make a blanket assumption that any index column with more
+				 * than a single clause cannot include ScalarArrayOpExpr
+				 * clauses >= that column.  Quals like "WHERE my_col in (1,2)
+				 * AND my_col < 1" are unsafe without this.
+				 *
+				 * XXX This is overkill.
+				 */
+				if (list_length(colclauses) > 1)
+					continue;
+
+				if (indexcol != prev_equality_indexcol + 1)
+				{
+					/*
+					 * An index attribute that lacks an equality constraint
+					 * was included as a clause already.  This may make it
+					 * unsafe to include this ScalarArrayOpExpr clause now.
+					 */
+					if (saop_included_already)
 					{
-						/* Caller doesn't want to lose index ordering */
-						*skip_lower_saop = true;
+						/*
+						 * We included at least one ScalarArrayOpExpr clause
+						 * earlier, too.  (This must have been included before
+						 * the inequality, since we treat ScalarArrayOpExpr
+						 * clauses as equality constraints by default.)
+						 *
+						 * We cannot safely include this ScalarArrayOpExpr as
+						 * a clause for the current index path.  It'll become
+						 * qpqual conditions instead.
+						 */
 						continue;
 					}
-					found_lower_saop_clause = true;
+
+					/*
+					 * This particular ScalarArrayOpExpr happens to be the
+					 * most significant one encountered so far.  That makes it
+					 * safe to include, despite gaps in constraints on prior
+					 * index columns -- provided we invalidate ordering for
+					 * the index path as a whole.
+					 */
+					if (skip_unordered_saop)
+					{
+						/* Caller doesn't want to lose index ordering */
+						*skip_unordered_saop = true;
+						continue;
+					}
+
+					/* Caller prioritizes selectivity over ordering */
+					saop_invalidates_ordering = true;
 				}
+
+				/*
+				 * Includable ScalarArrayOpExpr clauses are themselves
+				 * equality constraints (they don't make the inclusion of
+				 * further ScalarArrayOpExpr clauses invalidate ordering).
+				 *
+				 * XXX excludes inequality-type SAOPs using get_oprrest, which
+				 * seems particularly kludgey.
+				 */
+				saop_included_already = true;
+				if (saop->useOr && get_oprrest(saop->opno) == F_EQSEL)
+					prev_equality_indexcol = indexcol;
 			}
+			else if (IsA(rinfo->clause, NullTest))
+			{
+				NullTest   *nulltest = (NullTest *) rinfo->clause;
+
+				/*
+				 * Like ScalarArrayOpExpr clauses, IS NULL NullTest clauses
+				 * are treated as equality conditions, despite not being
+				 * recognized as such by the equivalence class machinery.
+				 *
+				 * This relies on the assumption that amsearcharray index AMs
+				 * will treat NULL as just another value from the domain of
+				 * indexed values for initial search purposes.
+				 */
+				if (!nulltest->argisrow && nulltest->nulltesttype == IS_NULL)
+					prev_equality_indexcol = indexcol;
+			}
+			else if (IsA(rinfo->clause, RowCompareExpr))
+			{
+				/*
+				 * RowCompareExpr clause will make it unsafe to include any
+				 * ScalarArrayOpExpr encountered in lower-order clauses.
+				 * (Already-included ScalarArrayOpExpr clauses can stay.)
+				 */
+				row_compare_seen_already = true;
+			}
+			else if (rinfo->mergeopfamilies)
+			{
+				/*
+				 * Equality constraint clause -- won't make it unsafe to
+				 * include later ScalarArrayOpExpr clauses
+				 */
+				prev_equality_indexcol = indexcol;
+			}
+			else
+			{
+				/*
+				 * Clause isn't an equality condition according to the EQ
+				 * machinery (not a NullTest or ScalarArrayOpExpr, either).
+				 *
+				 * If there are any later ScalarArrayOpExpr clauses, they must
+				 * not be used as index quals.  We'll either make it safe by
+				 * setting saop_invalidates_ordering to true, or by just not
+				 * including them (they can still be qpqual conditions).
+				 *
+				 * Note: there are several interesting types of expressions
+				 * that we deem incompatible with ScalarArrayOpExpr clauses
+				 * due to a lack of infrastructure to perform transformations
+				 * of predicates from CNF (conjunctive normal form) to DNF
+				 * (disjunctive normal form).  The MDAM paper describes many
+				 * examples of these transformations.
+				 */
+			}
+
+	include_clause:
 
 			/* OK to include this clause */
 			index_clauses = lappend(index_clauses, iclause);
@@ -960,7 +1100,7 @@ build_index_paths(PlannerInfo *root, RelOptInfo *rel,
 	 * assume the scan is unordered.
 	 */
 	pathkeys_possibly_useful = (scantype != ST_BITMAPSCAN &&
-								!found_lower_saop_clause &&
+								!saop_invalidates_ordering &&
 								has_useful_pathkeys(root, rel));
 	index_is_ordered = (index->sortopfamily != NULL);
 	if (index_is_ordered && pathkeys_possibly_useful)
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index c4fcd0076..51de102b0 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -6700,9 +6700,9 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	 * For a RowCompareExpr, we consider only the first column, just as
 	 * rowcomparesel() does.
 	 *
-	 * If there's a ScalarArrayOpExpr in the quals, we'll actually perform N
-	 * index scans not one, but the ScalarArrayOpExpr's operator can be
-	 * considered to act the same as it normally does.
+	 * If there's a ScalarArrayOpExpr in the quals, we'll perform N primitive
+	 * index scans in the worst case.  Assume that worst case, for now.  We'll
+	 * clamp later on if the tally approaches the total number of index pages.
 	 */
 	indexBoundQuals = NIL;
 	indexcol = 0;
@@ -6754,7 +6754,15 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 
 				clause_op = saop->opno;
 				found_saop = true;
-				/* count number of SA scans induced by indexBoundQuals only */
+
+				/*
+				 * Count number of SA scans induced by indexBoundQuals only.
+				 *
+				 * Since this is multiplicative, it can wildly inflate the
+				 * assumed number of descents (number of primitive index
+				 * scans) for scans with several SAOP clauses.  We might clamp
+				 * num_sa_scans later on to deal with this.
+				 */
 				if (alength > 1)
 					num_sa_scans *= alength;
 			}
@@ -6832,6 +6840,39 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 
 	genericcostestimate(root, path, loop_count, &costs);
 
+	/*
+	 * The btree index AM will automatically combine individual primitive
+	 * index scans whenever the tuples covered by the next set of array keys
+	 * are close to tuples covered by the current set.  This optimization
+	 * makes the final number of descents particularly difficult to estimate.
+	 * However, btree scans never visit any single leaf page more than once.
+	 * That puts a natural floor under the worst case number of descents.
+	 *
+	 * Clamp the number of descents to the estimated number of leaf page
+	 * visits.  This is still fairly pessimistic, but tends to result in more
+	 * accurate costing of scans with several SAOP clauses -- especially when
+	 * each array has more than a few elements.
+	 *
+	 * Also clamp the number of descents to 1/3 the number of index pages.
+	 * This avoids implausibly high estimates with low selectivity paths,
+	 * where scans frequently require no more than one or two descents.
+	 *
+	 * XXX genericcostestimate is still the dominant influence on the total
+	 * cost of SAOP-heavy index paths -- indexTotalCost is still calculated in
+	 * a way that assumes significant repeat access to leaf pages for a path
+	 * with SAOP clauses.  This just isn't sensible anymore.  Note that nbtree
+	 * scans promise to avoid accessing any leaf page more than once.  The
+	 * worst case I/O cost of an SAOP-heavy path is therefore guaranteed to
+	 * never exceed the I/O cost of a conventional full index scan (though
+	 * this relies on standard assumptions about internal page access costs).
+	 */
+	if (num_sa_scans > 1)
+	{
+		num_sa_scans = Min(num_sa_scans, costs.numIndexPages);
+		num_sa_scans = Min(num_sa_scans, index->pages / 3);
+		num_sa_scans = Max(num_sa_scans, 1);
+	}
+
 	/*
 	 * Add a CPU-cost component to represent the costs of initial btree
 	 * descent.  We don't charge any I/O cost for touching upper btree levels,
@@ -6847,7 +6888,7 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	{
 		descentCost = ceil(log(index->tuples) / log(2.0)) * cpu_operator_cost;
 		costs.indexStartupCost += descentCost;
-		costs.indexTotalCost += costs.num_sa_scans * descentCost;
+		costs.indexTotalCost += num_sa_scans * descentCost;
 	}
 
 	/*
@@ -6858,11 +6899,12 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	 * in cases where only a single leaf page is expected to be visited.  This
 	 * cost is somewhat arbitrarily set at 50x cpu_operator_cost per page
 	 * touched.  The number of such pages is btree tree height plus one (ie,
-	 * we charge for the leaf page too).  As above, charge once per SA scan.
+	 * we charge for the leaf page too).  As above, charge once per estimated
+	 * primitive SA scan.
 	 */
 	descentCost = (index->tree_height + 1) * DEFAULT_PAGE_CPU_MULTIPLIER * cpu_operator_cost;
 	costs.indexStartupCost += descentCost;
-	costs.indexTotalCost += costs.num_sa_scans * descentCost;
+	costs.indexTotalCost += num_sa_scans * descentCost;
 
 	/*
 	 * If we can get an estimate of the first column's ordering correlation C
-- 
2.40.1

