From a9d74e3bc53df673f9f9198e6432b03130f420af Mon Sep 17 00:00:00 2001
From: Edmund Horner <ejrh00@gmail.com>
Date: Fri, 12 Oct 2018 16:28:19 +1300
Subject: [PATCH 2/4] Support range quals in Tid Scan

This means queries with expressions such as "ctid >= ? AND ctid < ?" can be
answered by scanning over that part of a table, rather than falling back to a
full SeqScan.
---
 src/backend/executor/nodeTidscan.c      | 858 ++++++++++++++++++++++++--------
 src/backend/optimizer/path/costsize.c   |  43 +-
 src/backend/optimizer/path/tidpath.c    | 145 ++++--
 src/backend/optimizer/plan/createplan.c |  27 +-
 src/include/catalog/pg_operator.dat     |   6 +-
 src/include/nodes/execnodes.h           |  24 +-
 src/include/nodes/relation.h            |  13 +-
 src/test/regress/expected/tidscan.out   | 250 ++++++++++
 src/test/regress/sql/tidscan.sql        |  76 +++
 9 files changed, 1180 insertions(+), 262 deletions(-)

diff --git a/src/backend/executor/nodeTidscan.c b/src/backend/executor/nodeTidscan.c
index bc859e3..3897b97 100644
--- a/src/backend/executor/nodeTidscan.c
+++ b/src/backend/executor/nodeTidscan.c
@@ -22,7 +22,9 @@
  */
 #include "postgres.h"
 
+#include "access/relscan.h"
 #include "access/sysattr.h"
+#include "catalog/pg_operator.h"
 #include "catalog/pg_type.h"
 #include "executor/execdebug.h"
 #include "executor/nodeTidscan.h"
@@ -39,21 +41,132 @@
 	 ((Var *) (node))->varattno == SelfItemPointerAttributeNumber && \
 	 ((Var *) (node))->varlevelsup == 0)
 
+typedef enum
+{
+	TIDEXPR_IN_ARRAY,
+	TIDEXPR_EQ,
+	TIDEXPR_UPPER_BOUND,
+	TIDEXPR_LOWER_BOUND
+}			TidExprType;
+
+/* one element in TidExpr's opexprs */
+typedef struct TidOpExpr
+{
+	TidExprType exprtype;		/* type of op */
+	ExprState  *exprstate;		/* ExprState for a TID-yielding subexpr */
+	bool		inclusive;		/* whether op is inclusive */
+}			TidOpExpr;
+
 /* one element in tss_tidexprs */
 typedef struct TidExpr
 {
-	ExprState  *exprstate;		/* ExprState for a TID-yielding subexpr */
-	bool		isarray;		/* if true, it yields tid[] not just tid */
-	CurrentOfExpr *cexpr;		/* alternatively, we can have CURRENT OF */
+	List	   *opexprs;		/* list of individual op exprs */
+	CurrentOfExpr *cexpr;		/* For TIDEXPR_CURRENT_OF */
 } TidExpr;
 
+typedef struct TidRange
+{
+	ItemPointerData first;
+	ItemPointerData last;
+}			TidRange;
+
+static TidOpExpr * MakeTidOpExpr(OpExpr *expr, TidScanState *tidstate);
+static TidOpExpr * MakeTidScalarArrayOpExpr(ScalarArrayOpExpr *saop, TidScanState *tidstate);
+static List *MakeTidOpExprList(List *exprs, TidScanState *tidstate);
 static void TidExprListCreate(TidScanState *tidstate);
+static TidRange * EnsureTidRangeSpace(TidRange * tidRanges, int numRanges, int *numAllocRanges,
+									  int numNewItems);
+static bool SetTidLowerBound(ItemPointer tid, bool inclusive, ItemPointer lowerBound);
+static bool SetTidUpperBound(ItemPointer tid, bool inclusive, ItemPointer upperBound);
 static void TidListEval(TidScanState *tidstate);
-static int	itemptr_comparator(const void *a, const void *b);
+static bool MergeTidRanges(TidRange * a, TidRange * b);
+static int	tidrange_comparator(const void *a, const void *b);
+static HeapScanDesc BeginTidRangeScan(TidScanState *node, TidRange * range);
+static HeapTuple NextInTidRange(HeapScanDesc scandesc, ScanDirection direction, TidRange * range);
 static TupleTableSlot *TidNext(TidScanState *node);
 
 
 /*
+ * For the given 'expr', build and return an appropriate TidOpExpr taking into
+ * account the expr's operator and operand order.
+ */
+static TidOpExpr *
+MakeTidOpExpr(OpExpr *expr, TidScanState *tidstate)
+{
+	Node	   *arg1 = get_leftop((Expr *) expr);
+	Node	   *arg2 = get_rightop((Expr *) expr);
+	ExprState  *exprstate = NULL;
+	bool		invert = false;
+	TidOpExpr  *tidopexpr;
+
+	if (IsCTIDVar(arg1))
+		exprstate = ExecInitExpr((Expr *) arg2, &tidstate->ss.ps);
+	else if (IsCTIDVar(arg2))
+	{
+		exprstate = ExecInitExpr((Expr *) arg1, &tidstate->ss.ps);
+		invert = true;
+	}
+	else
+		elog(ERROR, "could not identify CTID variable");
+
+	tidopexpr = (TidOpExpr *) palloc0(sizeof(TidOpExpr));
+
+	switch (expr->opno)
+	{
+		case TIDLessEqOperator:
+			tidopexpr->inclusive = true;
+			/* fall through */
+		case TIDLessOperator:
+			tidopexpr->exprtype = invert ? TIDEXPR_LOWER_BOUND : TIDEXPR_UPPER_BOUND;
+			break;
+		case TIDGreaterEqOperator:
+			tidopexpr->inclusive = true;
+			/* fall through */
+		case TIDGreaterOperator:
+			tidopexpr->exprtype = invert ? TIDEXPR_UPPER_BOUND : TIDEXPR_LOWER_BOUND;
+			break;
+		default:
+			tidopexpr->exprtype = TIDEXPR_EQ;
+	}
+
+	tidopexpr->exprstate = exprstate;
+
+	return tidopexpr;
+}
+
+static TidOpExpr *
+MakeTidScalarArrayOpExpr(ScalarArrayOpExpr *saop, TidScanState *tidstate)
+{
+	TidOpExpr  *tidopexpr;
+
+	Assert(IsCTIDVar(linitial(saop->args)));
+
+	tidopexpr = (TidOpExpr *) palloc0(sizeof(TidOpExpr));
+	tidopexpr->exprstate = ExecInitExpr(lsecond(saop->args),
+										&tidstate->ss.ps);
+	tidopexpr->exprtype = TIDEXPR_IN_ARRAY;
+
+	return tidopexpr;
+}
+
+static List *
+MakeTidOpExprList(List *exprs, TidScanState *tidstate)
+{
+	ListCell   *l;
+	List	   *tidopexprs = NIL;
+
+	foreach(l, exprs)
+	{
+		OpExpr	   *opexpr = lfirst(l);
+		TidOpExpr  *tidopexpr = MakeTidOpExpr(opexpr, tidstate);
+
+		tidopexprs = lappend(tidopexprs, tidopexpr);
+	}
+
+	return tidopexprs;
+}
+
+/*
  * Extract the qual subexpressions that yield TIDs to search for,
  * and compile them into ExprStates if they're ordinary expressions.
  *
@@ -69,6 +182,17 @@ TidExprListCreate(TidScanState *tidstate)
 	tidstate->tss_tidexprs = NIL;
 	tidstate->tss_isCurrentOf = false;
 
+	/*
+	 * If no quals were specified, then a complete scan is assumed.  Make a
+	 * TidExpr with an empty list of TidOpExprs.
+	 */
+	if (!node->tidquals)
+	{
+		TidExpr    *tidexpr = (TidExpr *) palloc0(sizeof(TidExpr));
+
+		tidstate->tss_tidexprs = lappend(tidstate->tss_tidexprs, tidexpr);
+	}
+
 	foreach(l, node->tidquals)
 	{
 		Expr	   *expr = (Expr *) lfirst(l);
@@ -76,37 +200,30 @@ TidExprListCreate(TidScanState *tidstate)
 
 		if (is_opclause(expr))
 		{
-			Node	   *arg1;
-			Node	   *arg2;
-
-			arg1 = get_leftop(expr);
-			arg2 = get_rightop(expr);
-			if (IsCTIDVar(arg1))
-				tidexpr->exprstate = ExecInitExpr((Expr *) arg2,
-												  &tidstate->ss.ps);
-			else if (IsCTIDVar(arg2))
-				tidexpr->exprstate = ExecInitExpr((Expr *) arg1,
-												  &tidstate->ss.ps);
-			else
-				elog(ERROR, "could not identify CTID variable");
-			tidexpr->isarray = false;
+			OpExpr	   *opexpr = (OpExpr *) expr;
+			TidOpExpr  *tidopexpr = MakeTidOpExpr(opexpr, tidstate);
+
+			tidexpr->opexprs = list_make1(tidopexpr);
 		}
 		else if (expr && IsA(expr, ScalarArrayOpExpr))
 		{
-			ScalarArrayOpExpr *saex = (ScalarArrayOpExpr *) expr;
+			ScalarArrayOpExpr *saop = (ScalarArrayOpExpr *) expr;
+			TidOpExpr  *tidopexpr = MakeTidScalarArrayOpExpr(saop, tidstate);
 
-			Assert(IsCTIDVar(linitial(saex->args)));
-			tidexpr->exprstate = ExecInitExpr(lsecond(saex->args),
-											  &tidstate->ss.ps);
-			tidexpr->isarray = true;
+			tidexpr->opexprs = list_make1(tidopexpr);
 		}
 		else if (expr && IsA(expr, CurrentOfExpr))
 		{
 			CurrentOfExpr *cexpr = (CurrentOfExpr *) expr;
 
+			/* For CURRENT OF, save the expression in the TidExpr. */
 			tidexpr->cexpr = cexpr;
 			tidstate->tss_isCurrentOf = true;
 		}
+		else if (and_clause((Node *) expr))
+		{
+			tidexpr->opexprs = MakeTidOpExprList(((BoolExpr *) expr)->args, tidstate);
+		}
 		else
 			elog(ERROR, "could not identify CTID expression");
 
@@ -119,7 +236,256 @@ TidExprListCreate(TidScanState *tidstate)
 }
 
 /*
- * Compute the list of TIDs to be visited, by evaluating the expressions
+ * Ensure the array of TidRange objects has enough space for new items.
+ * May reallocate array.
+ */
+static TidRange *
+EnsureTidRangeSpace(TidRange * tidRanges, int numRanges, int *numAllocRanges,
+					int numNewItems)
+{
+	if (numRanges + numNewItems > *numAllocRanges)
+	{
+		/* If growing by one, grow exponentially; otherwise, grow just enough. */
+		if (numNewItems == 1)
+			*numAllocRanges *= 2;
+		else
+			*numAllocRanges = numRanges + numNewItems;
+
+		tidRanges = (TidRange *)
+			repalloc(tidRanges,
+					 *numAllocRanges * sizeof(TidRange));
+	}
+	return tidRanges;
+}
+
+/*
+ * Set a lower bound tid, taking into account the inclusivity of the bound.
+ * Return true if the bound is valid.
+ */
+static bool
+SetTidLowerBound(ItemPointer tid, bool inclusive, ItemPointer lowerBound)
+{
+	OffsetNumber offset;
+
+	*lowerBound = *tid;
+	offset = ItemPointerGetOffsetNumberNoCheck(tid);
+
+	if (!inclusive)
+	{
+		/* Check if the lower bound is actually in the next block. */
+		if (offset >= MaxOffsetNumber)
+		{
+			BlockNumber block = ItemPointerGetBlockNumberNoCheck(lowerBound);
+
+			/*
+			 * If the lower bound was already or above at the maximum block
+			 * number, then there is no valid range.
+			 */
+			if (block >= MaxBlockNumber)
+				return false;
+
+			ItemPointerSetBlockNumber(lowerBound, block + 1);
+			ItemPointerSetOffsetNumber(lowerBound, 1);
+		}
+		else
+			ItemPointerSetOffsetNumber(lowerBound, OffsetNumberNext(offset));
+	}
+	else if (offset == 0)
+		ItemPointerSetOffsetNumber(lowerBound, 1);
+
+	return true;
+}
+
+/*
+ * Set an upper bound tid, taking into account the inclusivity of the bound.
+ * Return true if the bound is valid.
+ */
+static bool
+SetTidUpperBound(ItemPointer tid, bool inclusive, ItemPointer upperBound)
+{
+	OffsetNumber offset;
+
+	*upperBound = *tid;
+	offset = ItemPointerGetOffsetNumberNoCheck(tid);
+
+	/*
+	 * Since TID offsets start at 1, an inclusive upper bound with offset 0
+	 * can be treated as an exclusive bound.  This has the benefit of
+	 * eliminating that block from the scan range.
+	 */
+	if (inclusive && offset == 0)
+		inclusive = false;
+
+	if (!inclusive)
+	{
+		/* Check if the upper bound is actually in the previous block. */
+		if (offset == 0)
+		{
+			BlockNumber block = ItemPointerGetBlockNumberNoCheck(upperBound);
+
+			/*
+			 * If the upper bound was already in block 0, then there is no
+			 * valid range.
+			 */
+			if (block == 0)
+				return false;
+
+			ItemPointerSetBlockNumber(upperBound, block - 1);
+			ItemPointerSetOffsetNumber(upperBound, MaxOffsetNumber);
+		}
+		else
+			ItemPointerSetOffsetNumber(upperBound, OffsetNumberPrev(offset));
+	}
+
+	return true;
+}
+
+static void
+TidInArrayExprEval(TidOpExpr * tidopexpr, BlockNumber nblocks, TidScanState *tidstate,
+				   TidRange * *tidRanges, int *numRanges, int *numAllocRanges)
+{
+	ExprContext *econtext = tidstate->ss.ps.ps_ExprContext;
+	bool		isNull;
+	Datum		arraydatum;
+	ArrayType  *itemarray;
+	Datum	   *ipdatums;
+	bool	   *ipnulls;
+	int			ndatums;
+	int			i;
+
+	arraydatum = ExecEvalExprSwitchContext(tidopexpr->exprstate,
+										   econtext,
+										   &isNull);
+	if (isNull)
+		return;
+
+	itemarray = DatumGetArrayTypeP(arraydatum);
+	deconstruct_array(itemarray,
+					  TIDOID, sizeof(ItemPointerData), false, 's',
+					  &ipdatums, &ipnulls, &ndatums);
+
+	*tidRanges = EnsureTidRangeSpace(*tidRanges, *numRanges, numAllocRanges, ndatums);
+
+	for (i = 0; i < ndatums; i++)
+	{
+		if (!ipnulls[i])
+		{
+			ItemPointer itemptr = (ItemPointer) DatumGetPointer(ipdatums[i]);
+
+			if (ItemPointerIsValid(itemptr) &&
+				ItemPointerGetBlockNumber(itemptr) < nblocks)
+			{
+				(*tidRanges)[*numRanges].first = *itemptr;
+				(*tidRanges)[*numRanges].last = *itemptr;
+				(*numRanges)++;
+			}
+		}
+	}
+	pfree(ipdatums);
+	pfree(ipnulls);
+}
+
+static void
+TidExprEval(TidExpr *expr, BlockNumber nblocks, TidScanState *tidstate,
+			TidRange * *tidRanges, int *numRanges, int *numAllocRanges)
+{
+	ExprContext *econtext = tidstate->ss.ps.ps_ExprContext;
+	ListCell   *l;
+	ItemPointerData lowerBound;
+	ItemPointerData upperBound;
+
+	/* The biggest range on an empty table is empty; just skip it. */
+	if (nblocks == 0)
+		return;
+
+	/* Set the lower and upper bound to scan the whole table. */
+	ItemPointerSetBlockNumber(&lowerBound, 0);
+	ItemPointerSetOffsetNumber(&lowerBound, 1);
+	ItemPointerSetBlockNumber(&upperBound, nblocks - 1);
+	ItemPointerSetOffsetNumber(&upperBound, MaxOffsetNumber);
+
+	foreach(l, expr->opexprs)
+	{
+		TidOpExpr  *tidopexpr = (TidOpExpr *) lfirst(l);
+
+		if (tidopexpr->exprtype == TIDEXPR_IN_ARRAY)
+		{
+			TidInArrayExprEval(tidopexpr, nblocks, tidstate,
+							   tidRanges, numRanges, numAllocRanges);
+
+			/*
+			 * A CTID = ANY expression only exists by itself; there shouldn't
+			 * be any other quals alongside it.  TidInArrayExprEval has
+			 * already added the ranges, so just return here.
+			 */
+			Assert(list_length(expr->opexprs) == 1);
+			return;
+		}
+		else
+		{
+			ItemPointer itemptr;
+			bool		isNull;
+
+			/* Evaluate this bound. */
+			itemptr = (ItemPointer)
+				DatumGetPointer(ExecEvalExprSwitchContext(tidopexpr->exprstate,
+														  econtext,
+														  &isNull));
+
+			/* If the bound is NULL, *nothing* matches the qual. */
+			if (isNull)
+				return;
+
+			if (tidopexpr->exprtype == TIDEXPR_EQ && ItemPointerIsValid(itemptr))
+			{
+				lowerBound = *itemptr;
+				upperBound = *itemptr;
+
+				/*
+				 * A CTID = ? expression only exists by itself, so set the
+				 * range to this single TID, and exit the loop (the remainder
+				 * of this function will add the range).
+				 */
+				Assert(list_length(expr->opexprs) == 1);
+				break;
+			}
+
+			if (tidopexpr->exprtype == TIDEXPR_LOWER_BOUND)
+			{
+				ItemPointerData lb;
+
+				if (!SetTidLowerBound(itemptr, tidopexpr->inclusive, &lb))
+					return;
+
+				if (ItemPointerCompare(&lb, &lowerBound) > 0)
+					lowerBound = lb;
+			}
+
+			if (tidopexpr->exprtype == TIDEXPR_UPPER_BOUND)
+			{
+				ItemPointerData ub;
+
+				if (!SetTidUpperBound(itemptr, tidopexpr->inclusive, &ub))
+					return;
+
+				if (ItemPointerCompare(&ub, &upperBound) < 0)
+					upperBound = ub;
+			}
+		}
+	}
+
+	/* If the resulting range is not empty, add it to the array. */
+	if (ItemPointerCompare(&lowerBound, &upperBound) <= 0)
+	{
+		*tidRanges = EnsureTidRangeSpace(*tidRanges, *numRanges, numAllocRanges, 1);
+		(*tidRanges)[*numRanges].first = lowerBound;
+		(*tidRanges)[*numRanges].last = upperBound;
+		(*numRanges)++;
+	}
+}
+
+/*
+ * Compute the list of TID ranges to be visited, by evaluating the expressions
  * for them.
  *
  * (The result is actually an array, not a list.)
@@ -129,9 +495,9 @@ TidListEval(TidScanState *tidstate)
 {
 	ExprContext *econtext = tidstate->ss.ps.ps_ExprContext;
 	BlockNumber nblocks;
-	ItemPointerData *tidList;
-	int			numAllocTids;
-	int			numTids;
+	TidRange   *tidRanges;
+	int			numAllocRanges;
+	int			numRanges;
 	ListCell   *l;
 
 	/*
@@ -147,76 +513,15 @@ TidListEval(TidScanState *tidstate)
 	 * are simple OpExprs or CurrentOfExprs.  If there are any
 	 * ScalarArrayOpExprs, we may have to enlarge the array.
 	 */
-	numAllocTids = list_length(tidstate->tss_tidexprs);
-	tidList = (ItemPointerData *)
-		palloc(numAllocTids * sizeof(ItemPointerData));
-	numTids = 0;
+	numAllocRanges = list_length(tidstate->tss_tidexprs);
+	tidRanges = (TidRange *) palloc0(numAllocRanges * sizeof(TidRange));
+	numRanges = 0;
 
 	foreach(l, tidstate->tss_tidexprs)
 	{
 		TidExpr    *tidexpr = (TidExpr *) lfirst(l);
-		ItemPointer itemptr;
-		bool		isNull;
 
-		if (tidexpr->exprstate && !tidexpr->isarray)
-		{
-			itemptr = (ItemPointer)
-				DatumGetPointer(ExecEvalExprSwitchContext(tidexpr->exprstate,
-														  econtext,
-														  &isNull));
-			if (!isNull &&
-				ItemPointerIsValid(itemptr) &&
-				ItemPointerGetBlockNumber(itemptr) < nblocks)
-			{
-				if (numTids >= numAllocTids)
-				{
-					numAllocTids *= 2;
-					tidList = (ItemPointerData *)
-						repalloc(tidList,
-								 numAllocTids * sizeof(ItemPointerData));
-				}
-				tidList[numTids++] = *itemptr;
-			}
-		}
-		else if (tidexpr->exprstate && tidexpr->isarray)
-		{
-			Datum		arraydatum;
-			ArrayType  *itemarray;
-			Datum	   *ipdatums;
-			bool	   *ipnulls;
-			int			ndatums;
-			int			i;
-
-			arraydatum = ExecEvalExprSwitchContext(tidexpr->exprstate,
-												   econtext,
-												   &isNull);
-			if (isNull)
-				continue;
-			itemarray = DatumGetArrayTypeP(arraydatum);
-			deconstruct_array(itemarray,
-							  TIDOID, sizeof(ItemPointerData), false, 's',
-							  &ipdatums, &ipnulls, &ndatums);
-			if (numTids + ndatums > numAllocTids)
-			{
-				numAllocTids = numTids + ndatums;
-				tidList = (ItemPointerData *)
-					repalloc(tidList,
-							 numAllocTids * sizeof(ItemPointerData));
-			}
-			for (i = 0; i < ndatums; i++)
-			{
-				if (!ipnulls[i])
-				{
-					itemptr = (ItemPointer) DatumGetPointer(ipdatums[i]);
-					if (ItemPointerIsValid(itemptr) &&
-						ItemPointerGetBlockNumber(itemptr) < nblocks)
-						tidList[numTids++] = *itemptr;
-				}
-			}
-			pfree(ipdatums);
-			pfree(ipnulls);
-		}
-		else
+		if (tidexpr->cexpr)
 		{
 			ItemPointerData cursor_tid;
 
@@ -225,16 +530,17 @@ TidListEval(TidScanState *tidstate)
 							  RelationGetRelid(tidstate->ss.ss_currentRelation),
 							  &cursor_tid))
 			{
-				if (numTids >= numAllocTids)
-				{
-					numAllocTids *= 2;
-					tidList = (ItemPointerData *)
-						repalloc(tidList,
-								 numAllocTids * sizeof(ItemPointerData));
-				}
-				tidList[numTids++] = cursor_tid;
+				tidRanges = EnsureTidRangeSpace(tidRanges, numRanges, &numAllocRanges, 1);
+				tidRanges[numRanges].first = cursor_tid;
+				tidRanges[numRanges].last = cursor_tid;
+				numRanges++;
 			}
 		}
+		else
+		{
+			TidExprEval(tidexpr, nblocks, tidstate,
+						&tidRanges, &numRanges, &numAllocRanges);
+		}
 	}
 
 	/*
@@ -243,52 +549,152 @@ TidListEval(TidScanState *tidstate)
 	 * the list.  Sorting makes it easier to detect duplicates, and as a bonus
 	 * ensures that we will visit the heap in the most efficient way.
 	 */
-	if (numTids > 1)
+	if (numRanges > 1)
 	{
-		int			lastTid;
+		int			lastRange;
 		int			i;
 
 		/* CurrentOfExpr could never appear OR'd with something else */
 		Assert(!tidstate->tss_isCurrentOf);
 
-		qsort((void *) tidList, numTids, sizeof(ItemPointerData),
-			  itemptr_comparator);
-		lastTid = 0;
-		for (i = 1; i < numTids; i++)
+		qsort((void *) tidRanges, numRanges, sizeof(TidRange), tidrange_comparator);
+		lastRange = 0;
+		for (i = 1; i < numRanges; i++)
 		{
-			if (!ItemPointerEquals(&tidList[lastTid], &tidList[i]))
-				tidList[++lastTid] = tidList[i];
+			if (!MergeTidRanges(&tidRanges[lastRange], &tidRanges[i]))
+				tidRanges[++lastRange] = tidRanges[i];
 		}
-		numTids = lastTid + 1;
+		numRanges = lastRange + 1;
 	}
 
-	tidstate->tss_TidList = tidList;
-	tidstate->tss_NumTids = numTids;
-	tidstate->tss_TidPtr = -1;
+	tidstate->tss_TidRanges = tidRanges;
+	tidstate->tss_NumTidRanges = numRanges;
+	tidstate->tss_TidRangePtr = -1;
+}
+
+/*
+ * If two ranges overlap, merge them into one.
+ * Assumes the two ranges are already ordered by (first, last).
+ * Returns true if they were merged.
+ */
+static bool
+MergeTidRanges(TidRange * a, TidRange * b)
+{
+	ItemPointerData a_last = a->last;
+	ItemPointerData b_last;
+
+	if (!ItemPointerIsValid(&a_last))
+		a_last = a->first;
+
+	/*
+	 * If the first range ends before the second one begins, they don't
+	 * overlap.
+	 */
+	if (ItemPointerCompare(&a_last, &b->first) < 0)
+		return false;
+
+	b_last = b->last;
+	if (!ItemPointerIsValid(&b_last))
+		b_last = b->first;
+
+	/*
+	 * Since they overlap, the end of the new range should be the maximum of
+	 * the original two range ends.
+	 */
+	if (ItemPointerCompare(&a_last, &b_last) < 0)
+		a->last = b->last;
+	return true;
 }
 
 /*
- * qsort comparator for ItemPointerData items
+ * qsort comparator for TidRange items
  */
 static int
-itemptr_comparator(const void *a, const void *b)
+tidrange_comparator(const void *a, const void *b)
 {
-	const ItemPointerData *ipa = (const ItemPointerData *) a;
-	const ItemPointerData *ipb = (const ItemPointerData *) b;
-	BlockNumber ba = ItemPointerGetBlockNumber(ipa);
-	BlockNumber bb = ItemPointerGetBlockNumber(ipb);
-	OffsetNumber oa = ItemPointerGetOffsetNumber(ipa);
-	OffsetNumber ob = ItemPointerGetOffsetNumber(ipb);
-
-	if (ba < bb)
-		return -1;
-	if (ba > bb)
-		return 1;
-	if (oa < ob)
-		return -1;
-	if (oa > ob)
-		return 1;
-	return 0;
+	TidRange   *tra = (TidRange *) a;
+	TidRange   *trb = (TidRange *) b;
+	int			cmp_first = ItemPointerCompare(&tra->first, &trb->first);
+
+	if (cmp_first != 0)
+		return cmp_first;
+	else
+		return ItemPointerCompare(&tra->last, &trb->last);
+}
+
+static HeapScanDesc
+BeginTidRangeScan(TidScanState *node, TidRange * range)
+{
+	HeapScanDesc scandesc = node->ss.ss_currentScanDesc;
+	BlockNumber first_block = ItemPointerGetBlockNumberNoCheck(&range->first);
+	BlockNumber last_block = ItemPointerGetBlockNumberNoCheck(&range->last);
+
+	if (!scandesc)
+	{
+		EState	   *estate = node->ss.ps.state;
+
+		scandesc = heap_beginscan_strat(node->ss.ss_currentRelation,
+										estate->es_snapshot,
+										0, NULL,
+										false, false);
+		node->ss.ss_currentScanDesc = scandesc;
+	}
+	else
+		heap_rescan(scandesc, NULL);
+
+	heap_setscanlimits(scandesc, first_block, last_block - first_block + 1);
+	node->tss_inScan = true;
+	return scandesc;
+}
+
+static HeapTuple
+NextInTidRange(HeapScanDesc scandesc, ScanDirection direction, TidRange * range)
+{
+	BlockNumber first_block = ItemPointerGetBlockNumber(&range->first);
+	OffsetNumber first_offset = ItemPointerGetOffsetNumber(&range->first);
+	BlockNumber last_block = ItemPointerGetBlockNumber(&range->last);
+	OffsetNumber last_offset = ItemPointerGetOffsetNumber(&range->last);
+	HeapTuple	tuple;
+
+	for (;;)
+	{
+		BlockNumber block;
+		OffsetNumber offset;
+
+		tuple = heap_getnext(scandesc, direction);
+		if (!tuple)
+			break;
+
+		/* Check that the tuple is within the required range. */
+		block = ItemPointerGetBlockNumber(&tuple->t_self);
+		offset = ItemPointerGetOffsetNumber(&tuple->t_self);
+
+		/*
+		 * If the tuple is in the fist block of the range and before the first
+		 * requested offset, then we can either skip it (if scanning forward),
+		 * or end the scan (if scanning backward).
+		 */
+		if (block == first_block && offset < first_offset)
+		{
+			if (ScanDirectionIsForward(direction))
+				continue;
+			else
+				return NULL;
+		}
+
+		/* Similarly for the last block, after the last requested offset. */
+		if (block == last_block && offset > last_offset)
+		{
+			if (ScanDirectionIsBackward(direction))
+				continue;
+			else
+				return NULL;
+		}
+
+		break;
+	}
+
+	return tuple;
 }
 
 /* ----------------------------------------------------------------
@@ -302,6 +708,7 @@ itemptr_comparator(const void *a, const void *b)
 static TupleTableSlot *
 TidNext(TidScanState *node)
 {
+	HeapScanDesc scandesc;
 	EState	   *estate;
 	ScanDirection direction;
 	Snapshot	snapshot;
@@ -309,105 +716,141 @@ TidNext(TidScanState *node)
 	HeapTuple	tuple;
 	TupleTableSlot *slot;
 	Buffer		buffer = InvalidBuffer;
-	ItemPointerData *tidList;
-	int			numTids;
-	bool		bBackward;
+	int			numRanges;
 
 	/*
 	 * extract necessary information from tid scan node
 	 */
+	scandesc = node->ss.ss_currentScanDesc;
 	estate = node->ss.ps.state;
 	direction = estate->es_direction;
 	snapshot = estate->es_snapshot;
 	heapRelation = node->ss.ss_currentRelation;
 	slot = node->ss.ss_ScanTupleSlot;
 
-	/*
-	 * First time through, compute the list of TIDs to be visited
-	 */
-	if (node->tss_TidList == NULL)
+	/* First time through, compute the list of TID ranges to be visited */
+	if (node->tss_TidRanges == NULL)
+	{
 		TidListEval(node);
 
-	tidList = node->tss_TidList;
-	numTids = node->tss_NumTids;
+		node->tss_TidRangePtr = -1;
+	}
 
-	/*
-	 * We use node->tss_htup as the tuple pointer; note this can't just be a
-	 * local variable here, as the scan tuple slot will keep a pointer to it.
-	 */
-	tuple = &(node->tss_htup);
+	numRanges = node->tss_NumTidRanges;
 
-	/*
-	 * Initialize or advance scan position, depending on direction.
-	 */
-	bBackward = ScanDirectionIsBackward(direction);
-	if (bBackward)
+	tuple = NULL;
+	for (;;)
 	{
-		if (node->tss_TidPtr < 0)
-		{
-			/* initialize for backward scan */
-			node->tss_TidPtr = numTids - 1;
-		}
-		else
-			node->tss_TidPtr--;
-	}
-	else
-	{
-		if (node->tss_TidPtr < 0)
+		TidRange   *currentRange;
+
+		if (!node->tss_inScan)
 		{
-			/* initialize for forward scan */
-			node->tss_TidPtr = 0;
+			/* Initialize or advance scan position, depending on direction. */
+			bool		bBackward = ScanDirectionIsBackward(direction);
+
+			if (bBackward)
+			{
+				if (node->tss_TidRangePtr < 0)
+				{
+					/* initialize for backward scan */
+					node->tss_TidRangePtr = numRanges - 1;
+				}
+				else
+					node->tss_TidRangePtr--;
+			}
+			else
+			{
+				if (node->tss_TidRangePtr < 0)
+				{
+					/* initialize for forward scan */
+					node->tss_TidRangePtr = 0;
+				}
+				else
+					node->tss_TidRangePtr++;
+			}
 		}
-		else
-			node->tss_TidPtr++;
-	}
 
-	while (node->tss_TidPtr >= 0 && node->tss_TidPtr < numTids)
-	{
-		tuple->t_self = tidList[node->tss_TidPtr];
+		if (node->tss_TidRangePtr >= numRanges || node->tss_TidRangePtr < 0)
+			break;
+
+		currentRange = &node->tss_TidRanges[node->tss_TidRangePtr];
 
 		/*
-		 * For WHERE CURRENT OF, the tuple retrieved from the cursor might
-		 * since have been updated; if so, we should fetch the version that is
-		 * current according to our snapshot.
+		 * Ranges with only one item -- including one resulting from a
+		 * CURRENT-OF qual -- are handled by looking up the item directly.
 		 */
-		if (node->tss_isCurrentOf)
-			heap_get_latest_tid(heapRelation, snapshot, &tuple->t_self);
-
-		if (heap_fetch(heapRelation, snapshot, tuple, &buffer, false, NULL))
+		if (ItemPointerEquals(&currentRange->first, &currentRange->last))
 		{
 			/*
-			 * Store the scanned tuple in the scan tuple slot of the scan
-			 * state.  Eventually we will only do this and not return a tuple.
+			 * We use node->tss_htup as the tuple pointer; note this can't
+			 * just be a local variable here, as the scan tuple slot will keep
+			 * a pointer to it.
 			 */
-			ExecStoreBufferHeapTuple(tuple, /* tuple to store */
-									 slot,	/* slot to store in */
-									 buffer);	/* buffer associated with
-												 * tuple */
+			tuple = &(node->tss_htup);
+			tuple->t_self = currentRange->first;
 
 			/*
-			 * At this point we have an extra pin on the buffer, because
-			 * ExecStoreHeapTuple incremented the pin count. Drop our local
-			 * pin.
+			 * For WHERE CURRENT OF, the tuple retrieved from the cursor might
+			 * since have been updated; if so, we should fetch the version
+			 * that is current according to our snapshot.
 			 */
-			ReleaseBuffer(buffer);
+			if (node->tss_isCurrentOf)
+				heap_get_latest_tid(heapRelation, snapshot, &tuple->t_self);
 
-			return slot;
+			if (heap_fetch(heapRelation, snapshot, tuple, &buffer, false, NULL))
+			{
+				/*
+				 * Store the scanned tuple in the scan tuple slot of the scan
+				 * state.  Eventually we will only do this and not return a
+				 * tuple.
+				 */
+				ExecStoreBufferHeapTuple(tuple, /* tuple to store */
+										 slot,	/* slot to store in */
+										 buffer);	/* buffer associated with
+													 * tuple */
+
+				/*
+				 * At this point we have an extra pin on the buffer, because
+				 * ExecStoreHeapTuple incremented the pin count. Drop our
+				 * local pin.
+				 */
+				ReleaseBuffer(buffer);
+
+				return slot;
+			}
+			else
+			{
+				tuple = NULL;
+			}
 		}
-		/* Bad TID or failed snapshot qual; try next */
-		if (bBackward)
-			node->tss_TidPtr--;
 		else
-			node->tss_TidPtr++;
+		{
+			if (!node->tss_inScan)
+				scandesc = BeginTidRangeScan(node, currentRange);
+
+			tuple = NextInTidRange(scandesc, direction, currentRange);
+			if (tuple)
+				break;
 
-		CHECK_FOR_INTERRUPTS();
+			node->tss_inScan = false;
+		}
 	}
 
 	/*
-	 * if we get here it means the tid scan failed so we are at the end of the
-	 * scan..
+	 * save the tuple and the buffer returned to us by the access methods in
+	 * our scan tuple slot and return the slot.  Note also that
+	 * ExecStoreHeapTuple will increment the refcount of the buffer; the
+	 * refcount will not be dropped until the tuple table slot is cleared.
 	 */
-	return ExecClearTuple(slot);
+	if (tuple)
+		ExecStoreBufferHeapTuple(tuple, /* tuple to store */
+								 slot,	/* slot to store in */
+								 scandesc->rs_cbuf);	/* buffer associated
+														 * with this tuple */
+	else
+		ExecClearTuple(slot);
+
+	return slot;
 }
 
 /*
@@ -460,11 +903,13 @@ ExecTidScan(PlanState *pstate)
 void
 ExecReScanTidScan(TidScanState *node)
 {
-	if (node->tss_TidList)
-		pfree(node->tss_TidList);
-	node->tss_TidList = NULL;
-	node->tss_NumTids = 0;
-	node->tss_TidPtr = -1;
+	if (node->tss_TidRanges)
+		pfree(node->tss_TidRanges);
+
+	node->tss_TidRanges = NULL;
+	node->tss_NumTidRanges = 0;
+	node->tss_TidRangePtr = -1;
+	node->tss_inScan = false;
 
 	ExecScanReScan(&node->ss);
 }
@@ -479,6 +924,8 @@ ExecReScanTidScan(TidScanState *node)
 void
 ExecEndTidScan(TidScanState *node)
 {
+	HeapScanDesc scan = node->ss.ss_currentScanDesc;
+
 	/*
 	 * Free the exprcontext
 	 */
@@ -490,6 +937,10 @@ ExecEndTidScan(TidScanState *node)
 	if (node->ss.ps.ps_ResultTupleSlot)
 		ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
 	ExecClearTuple(node->ss.ss_ScanTupleSlot);
+
+	/* close heap scan */
+	if (scan != NULL)
+		heap_endscan(scan);
 }
 
 /* ----------------------------------------------------------------
@@ -525,11 +976,12 @@ ExecInitTidScan(TidScan *node, EState *estate, int eflags)
 	ExecAssignExprContext(estate, &tidstate->ss.ps);
 
 	/*
-	 * mark tid list as not computed yet
+	 * mark tid range list as not computed yet
 	 */
-	tidstate->tss_TidList = NULL;
-	tidstate->tss_NumTids = 0;
-	tidstate->tss_TidPtr = -1;
+	tidstate->tss_TidRanges = NULL;
+	tidstate->tss_NumTidRanges = 0;
+	tidstate->tss_TidRangePtr = -1;
+	tidstate->tss_inScan = false;
 
 	/*
 	 * open the scan relation
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 7bf67a0..bffd2c0 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -1184,9 +1184,12 @@ cost_tidscan(Path *path, PlannerInfo *root,
 	QualCost	qpqual_cost;
 	Cost		cpu_per_tuple;
 	QualCost	tid_qual_cost;
-	int			ntuples;
+	double		ntuples;
+	double		nrandompages;
+	double		nseqpages;
 	ListCell   *l;
 	double		spc_random_page_cost;
+	double		spc_seq_page_cost;
 
 	/* Should only be applied to base relations */
 	Assert(baserel->relid > 0);
@@ -1198,8 +1201,10 @@ cost_tidscan(Path *path, PlannerInfo *root,
 	else
 		path->rows = baserel->rows;
 
-	/* Count how many tuples we expect to retrieve */
-	ntuples = 0;
+	/* Count how many tuples and pages we expect to retrieve */
+	ntuples = 0.0;
+	nrandompages = 0.0;
+	nseqpages = 0.0;
 	foreach(l, tidquals)
 	{
 		if (IsA(lfirst(l), ScalarArrayOpExpr))
@@ -1207,19 +1212,37 @@ cost_tidscan(Path *path, PlannerInfo *root,
 			/* Each element of the array yields 1 tuple */
 			ScalarArrayOpExpr *saop = (ScalarArrayOpExpr *) lfirst(l);
 			Node	   *arraynode = (Node *) lsecond(saop->args);
+			int			array_len = estimate_array_length(arraynode);
 
-			ntuples += estimate_array_length(arraynode);
+			ntuples += array_len;
+			nrandompages += array_len;
 		}
 		else if (IsA(lfirst(l), CurrentOfExpr))
 		{
 			/* CURRENT OF yields 1 tuple */
 			isCurrentOf = true;
-			ntuples++;
+			ntuples += 1.0;
+			nrandompages += 1.0;
 		}
 		else
 		{
-			/* It's just CTID = something, count 1 tuple */
-			ntuples++;
+			/*
+			 * For anything else, we'll use the normal selectivity estimate.
+			 * Count the first page as a random page, the rest as sequential.
+			 */
+			Selectivity selectivity = clause_selectivity(root, lfirst(l),
+														 baserel->relid,
+														 JOIN_INNER,
+														 NULL);
+			double		pages = selectivity * baserel->pages;
+
+			if (pages <= 0.0)
+				pages = 1.0;
+
+			/* TODO decide what the costs should be */
+			ntuples += selectivity * baserel->tuples;
+			nseqpages += pages - 1.0;
+			nrandompages += 1.0;
 		}
 	}
 
@@ -1248,10 +1271,10 @@ cost_tidscan(Path *path, PlannerInfo *root,
 	/* fetch estimated page cost for tablespace containing table */
 	get_tablespace_page_costs(baserel->reltablespace,
 							  &spc_random_page_cost,
-							  NULL);
+							  &spc_seq_page_cost);
 
-	/* disk costs --- assume each tuple on a different page */
-	run_cost += spc_random_page_cost * ntuples;
+	/* disk costs */
+	run_cost += spc_random_page_cost * nrandompages + spc_seq_page_cost * nseqpages;
 
 	/* Add scanning CPU costs */
 	get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost);
diff --git a/src/backend/optimizer/path/tidpath.c b/src/backend/optimizer/path/tidpath.c
index 3bb5b8d..9005249 100644
--- a/src/backend/optimizer/path/tidpath.c
+++ b/src/backend/optimizer/path/tidpath.c
@@ -4,13 +4,15 @@
  *	  Routines to determine which TID conditions are usable for scanning
  *	  a given relation, and create TidPaths accordingly.
  *
- * What we are looking for here is WHERE conditions of the form
- * "CTID = pseudoconstant", which can be implemented by just fetching
- * the tuple directly via heap_fetch().  We can also handle OR'd conditions
- * such as (CTID = const1) OR (CTID = const2), as well as ScalarArrayOpExpr
- * conditions of the form CTID = ANY(pseudoconstant_array).  In particular
- * this allows
- *		WHERE ctid IN (tid1, tid2, ...)
+ * What we are looking for here is WHERE conditions of the forms:
+ * - "CTID = pseudoconstant", which can be implemented by just fetching
+ *    the tuple directly via heap_fetch().
+ * - "CTID IN (pseudoconstant, ...)" or "CTID = ANY(pseudoconstant_array)"
+ * - "CTID > pseudoconstant", etc. for >, >=, <, and <=.
+ * - "CTID > pseudoconstant AND CTID < pseudoconstant AND ...", etc.
+ *
+ * We can also handle OR'd conditions of the above form, such as
+ * "(CTID = const1) OR (CTID >= const2) OR CTID IN (...)".
  *
  * We also support "WHERE CURRENT OF cursor" conditions (CurrentOfExpr),
  * which amount to "CTID = run-time-determined-TID".  These could in
@@ -46,32 +48,45 @@
 #include "optimizer/restrictinfo.h"
 
 
-static bool IsTidEqualClause(OpExpr *node, int varno);
+static bool IsTidVar(Var *var, int varno);
+static bool IsTidComparison(OpExpr *node, int varno, Oid expected_comparison_operator);
 static bool IsTidEqualAnyClause(ScalarArrayOpExpr *node, int varno);
+static List *MakeTidRangeQuals(List *quals);
+static List *TidCompoundRangeQualFromExpr(Node *expr, int varno);
 static List *TidQualFromExpr(Node *expr, int varno);
 static List *TidQualFromBaseRestrictinfo(RelOptInfo *rel);
 
 
+static bool
+IsTidVar(Var *var, int varno)
+{
+	return (var->varattno == SelfItemPointerAttributeNumber &&
+			var->vartype == TIDOID &&
+			var->varno == varno &&
+			var->varlevelsup == 0);
+}
+
 /*
  * Check to see if an opclause is of the form
- *		CTID = pseudoconstant
+ *		CTID OP pseudoconstant
  * or
- *		pseudoconstant = CTID
+ *		pseudoconstant OP CTID
+ * where OP is the expected comparison operator.
  *
  * We check that the CTID Var belongs to relation "varno".  That is probably
  * redundant considering this is only applied to restriction clauses, but
  * let's be safe.
  */
 static bool
-IsTidEqualClause(OpExpr *node, int varno)
+IsTidComparison(OpExpr *node, int varno, Oid expected_comparison_operator)
 {
 	Node	   *arg1,
 			   *arg2,
 			   *other;
 	Var		   *var;
 
-	/* Operator must be tideq */
-	if (node->opno != TIDEqualOperator)
+	/* Operator must be the expected one */
+	if (node->opno != expected_comparison_operator)
 		return false;
 	if (list_length(node->args) != 2)
 		return false;
@@ -83,19 +98,13 @@ IsTidEqualClause(OpExpr *node, int varno)
 	if (arg1 && IsA(arg1, Var))
 	{
 		var = (Var *) arg1;
-		if (var->varattno == SelfItemPointerAttributeNumber &&
-			var->vartype == TIDOID &&
-			var->varno == varno &&
-			var->varlevelsup == 0)
+		if (IsTidVar(var, varno))
 			other = arg2;
 	}
 	if (!other && arg2 && IsA(arg2, Var))
 	{
 		var = (Var *) arg2;
-		if (var->varattno == SelfItemPointerAttributeNumber &&
-			var->vartype == TIDOID &&
-			var->varno == varno &&
-			var->varlevelsup == 0)
+		if (IsTidVar(var, varno))
 			other = arg1;
 	}
 	if (!other)
@@ -110,6 +119,17 @@ IsTidEqualClause(OpExpr *node, int varno)
 	return true;				/* success */
 }
 
+#define IsTidEqualClause(node, varno)	IsTidComparison(node, varno, TIDEqualOperator)
+#define IsTidLTClause(node, varno)		IsTidComparison(node, varno, TIDLessOperator)
+#define IsTidLEClause(node, varno)		IsTidComparison(node, varno, TIDLessEqOperator)
+#define IsTidGTClause(node, varno)		IsTidComparison(node, varno, TIDGreaterOperator)
+#define IsTidGEClause(node, varno)		IsTidComparison(node, varno, TIDGreaterEqOperator)
+
+#define IsTidRangeClause(node, varno)	(IsTidLTClause(node, varno) || \
+										 IsTidLEClause(node, varno) || \
+										 IsTidGTClause(node, varno) || \
+										 IsTidGEClause(node, varno))
+
 /*
  * Check to see if a clause is of the form
  *		CTID = ANY (pseudoconstant_array)
@@ -134,10 +154,7 @@ IsTidEqualAnyClause(ScalarArrayOpExpr *node, int varno)
 	{
 		Var		   *var = (Var *) arg1;
 
-		if (var->varattno == SelfItemPointerAttributeNumber &&
-			var->vartype == TIDOID &&
-			var->varno == varno &&
-			var->varlevelsup == 0)
+		if (IsTidVar(var, varno))
 		{
 			/* The other argument must be a pseudoconstant */
 			if (is_pseudo_constant_clause(arg2))
@@ -148,6 +165,42 @@ IsTidEqualAnyClause(ScalarArrayOpExpr *node, int varno)
 	return false;
 }
 
+static List *
+MakeTidRangeQuals(List *quals)
+{
+	if (list_length(quals) == 1)
+		return quals;
+	else
+		return list_make1(make_andclause(quals));
+}
+
+/*
+ * TidCompoundRangeQualFromExpr
+ *
+ * 		Extract a compound CTID range condition from the given qual expression
+ */
+static List *
+TidCompoundRangeQualFromExpr(Node *expr, int varno)
+{
+	ListCell   *l;
+	List	   *found_quals = NIL;
+
+	foreach(l, ((BoolExpr *) expr)->args)
+	{
+		Node	   *clause = (Node *) lfirst(l);
+
+		/* If this clause contains a range qual, add it to the list. */
+		if (is_opclause(clause) && IsTidRangeClause((OpExpr *) clause, varno))
+			found_quals = lappend(found_quals, clause);
+	}
+
+	/* If we found any, make an AND clause out of them. */
+	if (found_quals)
+		return MakeTidRangeQuals(found_quals);
+	else
+		return NIL;
+}
+
 /*
  *	Extract a set of CTID conditions from the given qual expression
  *
@@ -174,6 +227,8 @@ TidQualFromExpr(Node *expr, int varno)
 		/* base case: check for tideq opclause */
 		if (IsTidEqualClause((OpExpr *) expr, varno))
 			rlst = list_make1(expr);
+		else if (IsTidRangeClause((OpExpr *) expr, varno))
+			rlst = list_make1(expr);
 	}
 	else if (expr && IsA(expr, ScalarArrayOpExpr))
 	{
@@ -189,11 +244,18 @@ TidQualFromExpr(Node *expr, int varno)
 	}
 	else if (and_clause(expr))
 	{
-		foreach(l, ((BoolExpr *) expr)->args)
+		/* look for a range qual in the clause */
+		rlst = TidCompoundRangeQualFromExpr(expr, varno);
+
+		/* if no range qual was found, look for any other TID qual */
+		if (!rlst)
 		{
-			rlst = TidQualFromExpr((Node *) lfirst(l), varno);
-			if (rlst)
-				break;
+			foreach(l, ((BoolExpr *) expr)->args)
+			{
+				rlst = TidQualFromExpr((Node *) lfirst(l), varno);
+				if (rlst)
+					break;
+			}
 		}
 	}
 	else if (or_clause(expr))
@@ -217,17 +279,24 @@ TidQualFromExpr(Node *expr, int varno)
 }
 
 /*
- *	Extract a set of CTID conditions from the rel's baserestrictinfo list
+ * Extract a set of CTID conditions from the rel's baserestrictinfo list
+ *
+ * Normally we just use the first RestrictInfo item with some usable quals,
+ * but it's also possible for a good compound range qual, such as
+ * "CTID > ? AND CTID < ?", to be split across multiple items.  So we look for
+ * range quals in all items and use them if any were found.
  */
 static List *
 TidQualFromBaseRestrictinfo(RelOptInfo *rel)
 {
 	List	   *rlst = NIL;
 	ListCell   *l;
+	List	   *found_quals = NIL;
 
 	foreach(l, rel->baserestrictinfo)
 	{
 		RestrictInfo *rinfo = (RestrictInfo *) lfirst(l);
+		Node	   *clause = (Node *) rinfo->clause;
 
 		/*
 		 * If clause must wait till after some lower-security-level
@@ -236,10 +305,23 @@ TidQualFromBaseRestrictinfo(RelOptInfo *rel)
 		if (!restriction_is_securely_promotable(rinfo, rel))
 			continue;
 
-		rlst = TidQualFromExpr((Node *) rinfo->clause, rel->relid);
+		/* If this clause contains a range qual, add it to the list. */
+		if (is_opclause(clause) && IsTidRangeClause((OpExpr *) clause, rel->relid))
+		{
+			found_quals = lappend(found_quals, clause);
+			continue;
+		}
+
+		/* Look for other TID quals. */
+		rlst = TidQualFromExpr((Node *) clause, rel->relid);
 		if (rlst)
 			break;
 	}
+
+	/* Use a range qual if any were found. */
+	if (found_quals)
+		rlst = MakeTidRangeQuals(found_quals);
+
 	return rlst;
 }
 
@@ -264,6 +346,7 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
 
 	tidquals = TidQualFromBaseRestrictinfo(rel);
 
+	/* If there are tidquals, then it's worth generating a tidscan path. */
 	if (tidquals)
 		add_path(rel, (Path *) create_tidscan_path(root, rel, tidquals,
 												   required_outer));
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index da7a920..e2c0bce 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -3081,14 +3081,37 @@ create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
 	}
 
 	/*
-	 * Remove any clauses that are TID quals.  This is a bit tricky since the
-	 * tidquals list has implicit OR semantics.
+	 * Remove the tidquals from the scan clauses if possible, which is
+	 * generally if the tidquals were taken verbatim from any of the
+	 * RelOptInfo items.  If the tidquals don't represent the entire
+	 * RelOptInfo qual, then nothing will be removed.  Note that the tidquals
+	 * is a list; if there is more than one, we have to rebuild the equivalent
+	 * OR clause to find a match.
 	 */
 	ortidquals = tidquals;
 	if (list_length(ortidquals) > 1)
 		ortidquals = list_make1(make_orclause(ortidquals));
 	scan_clauses = list_difference(scan_clauses, ortidquals);
 
+	/*
+	 * In the case of a single compound qual such as "ctid > ? AND ...", the
+	 * various parts may have come from different RestrictInfos.  So remove
+	 * each part separately.  (This doesn't happen for multiple compound
+	 * quals, because the top-level OR clause can't be split over multiple
+	 * RestrictInfos.
+	 */
+	if (list_length(tidquals) == 1)
+	{
+		Node	   *qual = linitial(tidquals);
+
+		if (and_clause(qual))
+		{
+			BoolExpr   *and_qual = ((BoolExpr *) qual);
+
+			scan_clauses = list_difference(scan_clauses, and_qual->args);
+		}
+	}
+
 	scan_plan = make_tidscan(tlist,
 							 scan_clauses,
 							 scan_relid,
diff --git a/src/include/catalog/pg_operator.dat b/src/include/catalog/pg_operator.dat
index ce23c2f..7476916 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -156,15 +156,15 @@
   oprname => '<', oprleft => 'tid', oprright => 'tid', oprresult => 'bool',
   oprcom => '>(tid,tid)', oprnegate => '>=(tid,tid)', oprcode => 'tidlt',
   oprrest => 'scalarltsel', oprjoin => 'scalarltjoinsel' },
-{ oid => '2800', descr => 'greater than',
+{ oid => '2800', oid_symbol => 'TIDGreaterOperator', descr => 'greater than',
   oprname => '>', oprleft => 'tid', oprright => 'tid', oprresult => 'bool',
   oprcom => '<(tid,tid)', oprnegate => '<=(tid,tid)', oprcode => 'tidgt',
   oprrest => 'scalargtsel', oprjoin => 'scalargtjoinsel' },
-{ oid => '2801', descr => 'less than or equal',
+{ oid => '2801', oid_symbol => 'TIDLessEqOperator', descr => 'less than or equal',
   oprname => '<=', oprleft => 'tid', oprright => 'tid', oprresult => 'bool',
   oprcom => '>=(tid,tid)', oprnegate => '>(tid,tid)', oprcode => 'tidle',
   oprrest => 'scalarlesel', oprjoin => 'scalarlejoinsel' },
-{ oid => '2802', descr => 'greater than or equal',
+{ oid => '2802', oid_symbol => 'TIDGreaterEqOperator', descr => 'greater than or equal',
   oprname => '>=', oprleft => 'tid', oprright => 'tid', oprresult => 'bool',
   oprcom => '<=(tid,tid)', oprnegate => '<(tid,tid)', oprcode => 'tidge',
   oprrest => 'scalargesel', oprjoin => 'scalargejoinsel' },
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index ff93910..47c2257 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1492,15 +1492,18 @@ typedef struct BitmapHeapScanState
 	ParallelBitmapHeapState *pstate;
 } BitmapHeapScanState;
 
+typedef struct TidRange TidRange;
+
 /* ----------------
  *	 TidScanState information
  *
- *		tidexprs	   list of TidExpr structs (see nodeTidscan.c)
- *		isCurrentOf    scan has a CurrentOfExpr qual
- *		NumTids		   number of tids in this scan
- *		TidPtr		   index of currently fetched tid
- *		TidList		   evaluated item pointers (array of size NumTids)
- *		htup		   currently-fetched tuple, if any
+ *		tidexprs		list of TidExpr structs (see nodeTidscan.c)
+ *		isCurrentOf		scan has a CurrentOfExpr qual
+ *		NumTidRanges	number of tid ranges in this scan
+ *		TidRangePtr		index of current tid range
+ *		TidRanges		evaluated item pointers (array of size NumTids)
+ *		inScan			currently in a range scan
+ *		htup			currently-fetched tuple, if any
  * ----------------
  */
 typedef struct TidScanState
@@ -1508,10 +1511,11 @@ typedef struct TidScanState
 	ScanState	ss;				/* its first field is NodeTag */
 	List	   *tss_tidexprs;
 	bool		tss_isCurrentOf;
-	int			tss_NumTids;
-	int			tss_TidPtr;
-	ItemPointerData *tss_TidList;
-	HeapTupleData tss_htup;
+	int			tss_NumTidRanges;
+	int			tss_TidRangePtr;
+	TidRange   *tss_TidRanges;
+	bool		tss_inScan;		/* for range scans */
+	HeapTupleData tss_htup;		/* for current-of and single TID fetches */
 } TidScanState;
 
 /* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 6fd2420..895849f 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1228,14 +1228,21 @@ typedef struct BitmapOrPath
 /*
  * TidPath represents a scan by TID
  *
- * tidquals is an implicitly OR'ed list of qual expressions of the form
- * "CTID = pseudoconstant" or "CTID = ANY(pseudoconstant_array)".
+ * tidquals is an implicitly OR'ed list of qual expressions of the forms:
+ *   - "CTID = pseudoconstant"
+ *   - "CTID = ANY(pseudoconstant_array)"
+ *   - "CURRENT OF cursor"
+ *   - "(CTID relop pseudoconstant AND ...)"
+ *
+ * If tidquals is empty, all CTIDs will match (contrary to the usual meaning
+ * of an empty disjunction).
+ *
  * Note they are bare expressions, not RestrictInfos.
  */
 typedef struct TidPath
 {
 	Path		path;
-	List	   *tidquals;		/* qual(s) involving CTID = something */
+	List	   *tidquals;
 } TidPath;
 
 /*
diff --git a/src/test/regress/expected/tidscan.out b/src/test/regress/expected/tidscan.out
index 521ed1b..8083909 100644
--- a/src/test/regress/expected/tidscan.out
+++ b/src/test/regress/expected/tidscan.out
@@ -177,3 +177,253 @@ UPDATE tidscan SET id = -id WHERE CURRENT OF c RETURNING *;
 ERROR:  cursor "c" is not positioned on a row
 ROLLBACK;
 DROP TABLE tidscan;
+-- tests for tidrangescans
+CREATE TABLE tidrangescan(id integer, data text);
+INSERT INTO tidrangescan SELECT i,repeat('x', 100) FROM generate_series(1,1000) AS s(i);
+DELETE FROM tidrangescan WHERE substring(ctid::text from ',(\d+)\)')::integer > 10 OR substring(ctid::text from '\((\d+),')::integer >= 10;;
+VACUUM tidrangescan;
+-- range scans with upper bound
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid < '(1,0)';
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid < '(1,0)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid < '(1,0)';
+  ctid  
+--------
+ (0,1)
+ (0,2)
+ (0,3)
+ (0,4)
+ (0,5)
+ (0,6)
+ (0,7)
+ (0,8)
+ (0,9)
+ (0,10)
+(10 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid <= '(1,5)';
+             QUERY PLAN             
+------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid <= '(1,5)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid <= '(1,5)';
+  ctid  
+--------
+ (0,1)
+ (0,2)
+ (0,3)
+ (0,4)
+ (0,5)
+ (0,6)
+ (0,7)
+ (0,8)
+ (0,9)
+ (0,10)
+ (1,1)
+ (1,2)
+ (1,3)
+ (1,4)
+ (1,5)
+(15 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid < '(0,0)';
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid < '(0,0)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid < '(0,0)';
+ ctid 
+------
+(0 rows)
+
+-- range scans with lower bound
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid > '(9,8)';
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid > '(9,8)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid > '(9,8)';
+  ctid  
+--------
+ (9,9)
+ (9,10)
+(2 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(9,8)' < ctid;
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: ('(9,8)'::tid < ctid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE '(9,8)' < ctid;
+  ctid  
+--------
+ (9,9)
+ (9,10)
+(2 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid >= '(9,8)';
+             QUERY PLAN             
+------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid >= '(9,8)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid >= '(9,8)';
+  ctid  
+--------
+ (9,8)
+ (9,9)
+ (9,10)
+(3 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid >= '(100,0)';
+              QUERY PLAN              
+--------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (ctid >= '(100,0)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid >= '(100,0)';
+ ctid 
+------
+(0 rows)
+
+-- range scans with both bounds
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid > '(4,4)' AND '(4,7)' >= ctid;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: ((ctid > '(4,4)'::tid) AND ('(4,7)'::tid >= ctid))
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE ctid > '(4,4)' AND '(4,7)' >= ctid;
+ ctid  
+-------
+ (4,5)
+ (4,6)
+ (4,7)
+(3 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)';
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: (('(4,7)'::tid >= ctid) AND (ctid > '(4,4)'::tid))
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)';
+ ctid  
+-------
+ (4,5)
+ (4,6)
+ (4,7)
+(3 rows)
+
+-- combinations
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)';
+                                        QUERY PLAN                                         
+-------------------------------------------------------------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: ((('(4,7)'::tid >= ctid) AND (ctid > '(4,4)'::tid)) OR (ctid = '(2,2)'::tid))
+(2 rows)
+
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)';
+ ctid  
+-------
+ (2,2)
+ (4,5)
+ (4,6)
+ (4,7)
+(4 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)' AND data = 'foo';
+                                                     QUERY PLAN                                                     
+--------------------------------------------------------------------------------------------------------------------
+ Tid Scan on tidrangescan
+   TID Cond: ((('(4,7)'::tid >= ctid) AND (ctid > '(4,4)'::tid)) OR (ctid = '(2,2)'::tid))
+   Filter: ((('(4,7)'::tid >= ctid) AND (ctid > '(4,4)'::tid)) OR ((ctid = '(2,2)'::tid) AND (data = 'foo'::text)))
+(3 rows)
+
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)' AND data = 'foo';
+ ctid  
+-------
+ (4,5)
+ (4,6)
+ (4,7)
+(3 rows)
+
+-- extreme offsets
+SELECT ctid FROM tidrangescan where ctid > '(0,65535)' AND ctid < '(1,0)' LIMIT 1;
+ ctid 
+------
+(0 rows)
+
+SELECT ctid FROM tidrangescan where ctid < '(0,0)' LIMIT 1;
+ ctid 
+------
+(0 rows)
+
+-- make sure ranges are combined correctly
+SELECT COUNT(*) FROM tidrangescan WHERE ctid < '(0,3)' OR ctid >= '(0,2)' AND ctid <= '(0,5)';
+ count 
+-------
+     5
+(1 row)
+
+SELECT COUNT(*) FROM tidrangescan WHERE ctid <= '(0,10)' OR ctid >= '(0,2)' AND ctid <= '(0,5)';
+ count 
+-------
+    10
+(1 row)
+
+-- empty table
+CREATE TABLE tidrangescan_empty(id integer, data text);
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan_empty WHERE ctid < '(1, 0)';
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan_empty
+   TID Cond: (ctid < '(1,0)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan_empty WHERE ctid < '(1, 0)';
+ ctid 
+------
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan_empty WHERE ctid > '(9, 0)';
+            QUERY PLAN             
+-----------------------------------
+ Tid Scan on tidrangescan_empty
+   TID Cond: (ctid > '(9,0)'::tid)
+(2 rows)
+
+SELECT ctid FROM tidrangescan_empty WHERE ctid > '(9, 0)';
+ ctid 
+------
+(0 rows)
+
diff --git a/src/test/regress/sql/tidscan.sql b/src/test/regress/sql/tidscan.sql
index a8472e0..02b094a 100644
--- a/src/test/regress/sql/tidscan.sql
+++ b/src/test/regress/sql/tidscan.sql
@@ -64,3 +64,79 @@ UPDATE tidscan SET id = -id WHERE CURRENT OF c RETURNING *;
 ROLLBACK;
 
 DROP TABLE tidscan;
+
+-- tests for tidrangescans
+
+CREATE TABLE tidrangescan(id integer, data text);
+
+INSERT INTO tidrangescan SELECT i,repeat('x', 100) FROM generate_series(1,1000) AS s(i);
+DELETE FROM tidrangescan WHERE substring(ctid::text from ',(\d+)\)')::integer > 10 OR substring(ctid::text from '\((\d+),')::integer >= 10;;
+VACUUM tidrangescan;
+
+-- range scans with upper bound
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid < '(1,0)';
+SELECT ctid FROM tidrangescan WHERE ctid < '(1,0)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid <= '(1,5)';
+SELECT ctid FROM tidrangescan WHERE ctid <= '(1,5)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid < '(0,0)';
+SELECT ctid FROM tidrangescan WHERE ctid < '(0,0)';
+
+-- range scans with lower bound
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid > '(9,8)';
+SELECT ctid FROM tidrangescan WHERE ctid > '(9,8)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(9,8)' < ctid;
+SELECT ctid FROM tidrangescan WHERE '(9,8)' < ctid;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid >= '(9,8)';
+SELECT ctid FROM tidrangescan WHERE ctid >= '(9,8)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid >= '(100,0)';
+SELECT ctid FROM tidrangescan WHERE ctid >= '(100,0)';
+
+-- range scans with both bounds
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE ctid > '(4,4)' AND '(4,7)' >= ctid;
+SELECT ctid FROM tidrangescan WHERE ctid > '(4,4)' AND '(4,7)' >= ctid;
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)';
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)';
+
+-- combinations
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)';
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)' AND data = 'foo';
+SELECT ctid FROM tidrangescan WHERE '(4,7)' >= ctid AND ctid > '(4,4)' OR ctid = '(2,2)' AND data = 'foo';
+
+-- extreme offsets
+SELECT ctid FROM tidrangescan where ctid > '(0,65535)' AND ctid < '(1,0)' LIMIT 1;
+SELECT ctid FROM tidrangescan where ctid < '(0,0)' LIMIT 1;
+
+-- make sure ranges are combined correctly
+SELECT COUNT(*) FROM tidrangescan WHERE ctid < '(0,3)' OR ctid >= '(0,2)' AND ctid <= '(0,5)';
+
+SELECT COUNT(*) FROM tidrangescan WHERE ctid <= '(0,10)' OR ctid >= '(0,2)' AND ctid <= '(0,5)';
+
+-- empty table
+CREATE TABLE tidrangescan_empty(id integer, data text);
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan_empty WHERE ctid < '(1, 0)';
+SELECT ctid FROM tidrangescan_empty WHERE ctid < '(1, 0)';
+
+EXPLAIN (COSTS OFF)
+SELECT ctid FROM tidrangescan_empty WHERE ctid > '(9, 0)';
+SELECT ctid FROM tidrangescan_empty WHERE ctid > '(9, 0)';
-- 
2.7.4

