From a792d50306718ae7904b07d03be565b4203d075c Mon Sep 17 00:00:00 2001
From: Peter Geoghegan <pg@bowt.ie>
Date: Tue, 30 Jun 2020 16:29:27 -0700
Subject: [PATCH v3] Add delete deduplication to nbtree.

Repurpose deduplication infrastructure to delete items in indexes at the
point where we'd usually have to split the page, even when they don't
have their LP_DEAD bits set.  Testing has shown that this is almost
completely effective at preventing "version index bloat" from non-HOT
updates, provided there are no long running transactions.

This is primarily valuable with leaf pages that contain mostly-distinct
index tuples, particularly with unique indexes.  It is intended to
complement deduplication.  Heuristics are used to guess which index
tuples are likely to point to no longer needed old table row versions.

Note that INCLUDE indexes support the optimization.
---
 src/include/access/genam.h               |  15 +
 src/include/access/heapam.h              |   3 +-
 src/include/access/nbtree.h              |   5 +-
 src/include/access/tableam.h             |  43 ++-
 src/include/executor/executor.h          |   3 +-
 src/backend/access/heap/heapam.c         |  12 +-
 src/backend/access/heap/heapam_handler.c |   5 +-
 src/backend/access/nbtree/README         |  70 +++-
 src/backend/access/nbtree/nbtdedup.c     | 429 +++++++++++++++++++++--
 src/backend/access/nbtree/nbtinsert.c    |  40 ++-
 src/backend/access/nbtree/nbtsort.c      |  12 +-
 src/backend/access/nbtree/nbtxlog.c      |   4 +-
 src/backend/access/table/tableam.c       | 243 ++++++++++++-
 src/backend/commands/copy.c              |   5 +-
 src/backend/executor/execIndexing.c      |  41 ++-
 src/backend/executor/execReplication.c   |   4 +-
 src/backend/executor/nodeModifyTable.c   |  11 +-
 17 files changed, 865 insertions(+), 80 deletions(-)

diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 68d90f5141..7002da0716 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -108,10 +108,25 @@ typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
  * call is made with UNIQUE_CHECK_EXISTING.  The tuple is already in the
  * index in this case, so it should not be inserted again.  Rather, just
  * check for conflicting live tuples (possibly blocking).
+ *
+ * UNIQUE_CHECK_NO indicates the absence of any unique checking.
+ * UNIQUE_CHECK_NO_WITH_UNCHANGED is a variant of UNIQUE_CHECK_NO that
+ * indicates that the index tuple comes from an UPDATE that did not modify
+ * the row in respect of any columns that are indexed.  The implementation
+ * requires a successor version, but there is no logical change.  Some
+ * index access AMs can use this as hint that can trigger optimizations.
+ *
+ * XXX: Adding UNIQUE_CHECK_NO_WITH_UNCHANGED like this kind of makes
+ * sense, since it's pretty natural to leave it up to index AMs to figure
+ * it out with unique indexes.  But what about when we insert NULLs into a
+ * unique index?  Isn't that case UNIQUE_CHECK_YES, and yet also a thing
+ * that nbtree pretty much treats as UNIQUE_CHECK_NO once it sees that the
+ * index tuple has NULLs?
  */
 typedef enum IndexUniqueCheck
 {
 	UNIQUE_CHECK_NO,			/* Don't do any uniqueness checking */
+	UNIQUE_CHECK_NO_WITH_UNCHANGED, /* "No logical change" duplicate */
 	UNIQUE_CHECK_YES,			/* Enforce uniqueness at insertion time */
 	UNIQUE_CHECK_PARTIAL,		/* Test uniqueness, but no error */
 	UNIQUE_CHECK_EXISTING		/* Check if existing tuple is unique */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 92b19dba32..d950083a7d 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -148,7 +148,8 @@ extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
 							 HeapTuple newtup,
 							 CommandId cid, Snapshot crosscheck, bool wait,
-							 struct TM_FailureData *tmfd, LockTupleMode *lockmode);
+							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
+							 Bitmapset **modified_attrs_hint);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_update,
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 65d9698b89..33d9b429c0 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -1029,11 +1029,12 @@ extern void _bt_parallel_advance_array_keys(IndexScanDesc scan);
  */
 extern void _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
 							   IndexTuple newitem, Size newitemsz,
-							   bool checkingunique);
+							   bool checkingunique, bool nologicalchange,
+							   bool allequalimage);
 extern void _bt_dedup_start_pending(BTDedupState state, IndexTuple base,
 									OffsetNumber baseoff);
 extern bool _bt_dedup_save_htid(BTDedupState state, IndexTuple itup);
-extern Size _bt_dedup_finish_pending(Page newpage, BTDedupState state);
+extern Size _bt_dedup_merge_finish_pending(Page newpage, BTDedupState state);
 extern IndexTuple _bt_form_posting(IndexTuple base, ItemPointer htids,
 								   int nhtids);
 extern void _bt_update_posting(BTVacuumPosting vacposting);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 387eb34a61..d545a4abdf 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -128,6 +128,17 @@ typedef struct TM_FailureData
 	bool		traversed;
 } TM_FailureData;
 
+/*
+ * State used by table_index_batch_check() to perform "bottom up" deletion of
+ * duplicate index tuples
+ */
+typedef struct TM_IndexDelete
+{
+	OffsetNumber ioffnum;		/* Index am identifies entries with this */
+	ItemPointerData tid;		/* table TID from index tuple */
+	bool		isdead;			/* Is tuple dead? */
+} TM_IndexDelete;
+
 /* "options" flag bits for table_tuple_insert */
 /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */
 #define TABLE_INSERT_SKIP_FSM		0x0002
@@ -396,7 +407,8 @@ typedef struct TableAmRoutine
 								 bool wait,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
-								 bool *update_indexes);
+								 bool *update_indexes,
+								 Bitmapset **modified_attrs_hint);
 
 	/* see table_tuple_lock() for reference about parameters */
 	TM_Result	(*tuple_lock) (Relation rel,
@@ -1041,16 +1053,32 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
 }
 
 /*
- * This is a convenience wrapper around table_index_fetch_tuple() which
- * returns whether there are table tuple items corresponding to an index
- * entry.  This likely is only useful to verify if there's a conflict in a
- * unique index.
+ * These are convenience wrappers around table_index_fetch_tuple() which
+ * indicate whether there are table tuple items corresponding to an index
+ * entry.  Can be used to verify if there's a conflict in a unique index.
+ *
+ * table_index_batch_check() is a variant that is specialized to garbage
+ * collection of dead tuples in index access methods.  Duplicates are
+ * commonly caused by MVCC version churn when an optimization like
+ * heapam's HOT cannot be applied.  It can make sense to opportunistically
+ * guess that many index tuples are dead versions, particularly in unique
+ * indexes.
+ *
+ * Note that table_index_batch_check() sorts the duptids array so that the
+ * order of access is optimized.  Callers need to be able to deal with
+ * that.
  */
 extern bool table_index_fetch_tuple_check(Relation rel,
 										  ItemPointer tid,
 										  Snapshot snapshot,
 										  bool *all_dead);
 
+extern int	table_index_batch_check(Relation rel,
+									TM_IndexDelete *duptids,
+									int nduptids,
+									Snapshot snapshot,
+									int nkillsneeded,
+									int *nblocksaccessed);
 
 /* ------------------------------------------------------------------------
  * Functions for non-modifying operations on individual tuples
@@ -1311,12 +1339,13 @@ static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
-				   bool *update_indexes)
+				   bool *update_indexes, Bitmapset **modified_attrs_hint)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
 										 wait, tmfd,
-										 lockmode, update_indexes);
+										 lockmode, update_indexes,
+										 modified_attrs_hint);
 }
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index b7978cd22e..f056a7b124 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -579,7 +579,8 @@ extern void ExecCloseIndices(ResultRelInfo *resultRelInfo);
 extern List *ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
 								   TupleTableSlot *slot, EState *estate,
 								   bool noDupErr,
-								   bool *specConflict, List *arbiterIndexes);
+								   bool *specConflict, List *arbiterIndexes,
+								   Bitmapset *modified_attrs_hint);
 extern bool ExecCheckIndexConstraints(ResultRelInfo *resultRelInfo,
 									  TupleTableSlot *slot,
 									  EState *estate, ItemPointer conflictTid,
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 1585861a02..fa7ca33289 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2892,7 +2892,8 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode)
+			TM_FailureData *tmfd, LockTupleMode *lockmode,
+			Bitmapset **modified_attrs_hint)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -3758,10 +3759,15 @@ l2:
 	if (old_key_tuple != NULL && old_key_copied)
 		heap_freetuple(old_key_tuple);
 
+	/* Save for no logical changes hint when non-HOT update performed */
+	if (!use_hot_update && modified_attrs_hint)
+		*modified_attrs_hint = modified_attrs;
+	else
+		bms_free(modified_attrs);
+
 	bms_free(hot_attrs);
 	bms_free(key_attrs);
 	bms_free(id_attrs);
-	bms_free(modified_attrs);
 	bms_free(interesting_attrs);
 
 	return TM_Ok;
@@ -3891,7 +3897,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 	result = heap_update(relation, otid, tup,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &tmfd, &lockmode);
+						 &tmfd, &lockmode, NULL);
 	switch (result)
 	{
 		case TM_SelfModified:
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index dcaea7135f..f32ed0a5f2 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -314,7 +314,8 @@ static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 					bool wait, TM_FailureData *tmfd,
-					LockTupleMode *lockmode, bool *update_indexes)
+					LockTupleMode *lockmode, bool *update_indexes,
+					Bitmapset **modified_attrs_hint)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
@@ -325,7 +326,7 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode);
+						 tmfd, lockmode, modified_attrs_hint);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 9692e4cdf6..8560c5f6c3 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -807,7 +807,75 @@ Deduplication in unique indexes helps to prevent these pathological page
 splits.  Storing duplicates in a space efficient manner is not the goal,
 since in the long run there won't be any duplicates anyway.  Rather, we're
 buying time for standard garbage collection mechanisms to run before a
-page split is needed.
+page split is needed.  Also, the deduplication pass performs a targeted
+form of opportunistic deletion for unique indexes with version churn
+duplicates, as well as in cases where an UPDATE statement did not
+logically modify any indexed column, but nevertheless requires a successor
+index tuple.  The latter case happens when tableam optimizations such as
+heapam's HOT cannot be applied.  (We don't want to distinguish between
+version churn from UPDATEs and version churn from related INSERTs and
+DELETEs within a unique index.)
+
+The deduplication module usually opportunistically deletes whatever
+duplicates happen to be present on the page before moving on to
+deduplication proper, since in general some duplicates are likely to
+already be dead to everybody.  This mechanism is quite similar to
+on-the-fly deletion of index tuples that will already have failed to
+prevent a page split by the time deduplication is considered.  The main
+difference is that the tuples that get deleted are not opportunistically
+marked LP_DEAD by transactions that had to read the tuples in any case.
+
+The implementation must weigh the need to avoid a page split against the
+extra work performed with an exclusive buffer lock held.  It's possible to
+make this trade-off sensibly despite the uncertainty about versioning and
+update chains within nbtree.  In a unique index it's clear that there can
+only be one most recent committed version for any given value, which makes
+it certain that we'll delete some of the old versions --- at least in the
+absence of either a long running transaction that holds back the xmin
+horizon, and barring extreme churn concentrated in one part of the key
+space.
+
+Deduplication-deletion in non-unique indexes is trickier (the
+implementation is almost the same, but the justification is more
+complicated).  In general there is nothing that assures us that there
+cannot be many logical rows that all have the same value in respect of an
+indexed column, which will cause us to waste time trying to find "old dead
+versions" among the duplicates that are actually distinct logical rows.
+We assume that all indexes work more or less like a unique index.  This
+works better than you'd think.  We at least know that there is some chance
+of UPDATE version churn in affected pages; the tuple we're trying to
+insert on the page at the point that this happens certainly originated
+that way, so there is a good chance that the same is true of existing,
+committed tuples.  We're only willing to access a small number of
+heap/table pages to determine if our guess is correct, so if we're totally
+wrong then we'll have accessed no more than 2 or so heap/table pages.
+Finally, and perhaps most importantly, we'll learn from our mistake.  The
+natural consequence of failing to deduplicate-delete is to do a
+deduplicate-merge pass.  That will merge together the duplicate index
+tuples -- we surmise that these correspond to multiple extant logical
+rows.  If and when there is another deduplication-delete pass on the same
+page, we'll skip over the posting list tuple.
+
+A posting list tuple may not actually point to one distinct logical row
+per TID, of course.  Even when our inference is totally wrong it still
+seems like a good idea to skip posting lists like this.  In general, the
+deduplication-deletion algorithm aims to maximize the chances of deleting
+_some_ tuples, while paying only a low fixed cost to access visibility
+information from the table.  In general it's possible that deleting just
+one or two index tuples will buy us many hours or days before the question
+of splitting the same leaf page comes up again -- and VACUUM may well
+visit the page in that time anyway.  If there really is intense pressure
+against the page, with many deduplication-delete passes occurring only
+milliseconds apart, then a version-driven page split is practically
+guaranteed to occur before long.  This resolves the situation.
+
+Negative feedback (such as failing to dedup-delete any tuples) is not
+really undesirable.  At worst it is an unavoidable part of how the
+algorithm works.  We require that our various approaches to handling an
+overflowing page (due partially or entirely to version churn) compete to
+determine how best to handle the problem in a localized fashion.  We
+expect to converge on a stable and roughly optimal behavior at each part
+of the key space in each index affected by version churn.
 
 Unique index leaf pages only get a deduplication pass when an insertion
 (that might have to split the page) observed an existing duplicate on the
diff --git a/src/backend/access/nbtree/nbtdedup.c b/src/backend/access/nbtree/nbtdedup.c
index f6be865b17..9a2b2b637d 100644
--- a/src/backend/access/nbtree/nbtdedup.c
+++ b/src/backend/access/nbtree/nbtdedup.c
@@ -16,13 +16,24 @@
 
 #include "access/nbtree.h"
 #include "access/nbtxlog.h"
+#include "access/tableam.h"
 #include "miscadmin.h"
 #include "utils/rel.h"
 
+static bool _bt_dedup_delete_one_page(Relation rel, Buffer buf,
+									  Relation heapRel, Size newitemsz,
+									  bool checkingunique,
+									  bool logicallymodified,
+									  bool *dedupmerge);
+static void _bt_dedup_merge_one_page(Relation rel, Buffer buf,
+									 Relation heapRel, IndexTuple newitem,
+									 Size newitemsz, bool checkingunique);
+static void _bt_dedup_delete_finish_pending(BTDedupState state);
 static bool _bt_do_singleval(Relation rel, Page page, BTDedupState state,
 							 OffsetNumber minoff, IndexTuple newitem);
 static void _bt_singleval_fillfactor(Page page, BTDedupState state,
 									 Size newitemsz);
+static int	_bt_offsetnumbercmp(const void *arg1, const void *arg2);
 #ifdef USE_ASSERT_CHECKING
 static bool _bt_posting_valid(IndexTuple posting);
 #endif
@@ -32,16 +43,12 @@ static bool _bt_posting_valid(IndexTuple posting);
  * if we cannot successfully free at least newitemsz (we also need space for
  * newitem's line pointer, which isn't included in caller's newitemsz).
  *
- * The general approach taken here is to perform as much deduplication as
- * possible to free as much space as possible.  Note, however, that "single
- * value" strategy is sometimes used for !checkingunique callers, in which
- * case deduplication will leave a few tuples untouched at the end of the
- * page.  The general idea is to prepare the page for an anticipated page
- * split that uses nbtsplitloc.c's "single value" strategy to determine a
- * split point.  (There is no reason to deduplicate items that will end up on
- * the right half of the page after the anticipated page split; better to
- * handle those if and when the anticipated right half page gets its own
- * deduplication pass, following further inserts of duplicates.)
+ * There are two types of deduplication pass: The merge deduplication pass,
+ * where we merge together duplicate index tuples into a new posting list, and
+ * the delete deduplication pass, where old garbage version index tuples are
+ * deleted based on visibility information that we fetch from the table.  We
+ * generally expect to perform only one type of deduplication pass per call
+ * here, but it's possible that we'll end up doing both.
  *
  * This function should be called during insertion, when the page doesn't have
  * enough space to fit an incoming newitem.  If the BTP_HAS_GARBAGE page flag
@@ -54,27 +61,23 @@ static bool _bt_posting_valid(IndexTuple posting);
  */
 void
 _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
-				   IndexTuple newitem, Size newitemsz, bool checkingunique)
+				   IndexTuple newitem, Size newitemsz, bool checkingunique,
+				   bool logicallymodified, bool allequalimage)
 {
 	OffsetNumber offnum,
 				minoff,
 				maxoff;
 	Page		page = BufferGetPage(buf);
 	BTPageOpaque opaque;
-	Page		newpage;
 	OffsetNumber deletable[MaxIndexTuplesPerPage];
-	BTDedupState state;
 	int			ndeletable = 0;
-	Size		pagesaving = 0;
-	bool		singlevalstrat = false;
-	int			nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 
 	/*
 	 * We can't assume that there are no LP_DEAD items.  For one thing, VACUUM
 	 * will clear the BTP_HAS_GARBAGE hint without reliably removing items
 	 * that are marked LP_DEAD.  We don't want to unnecessarily unset LP_DEAD
-	 * bits when deduplicating items.  Allowing it would be correct, though
-	 * wasteful.
+	 * bits when deduplicating items by merging.  Allowing it would be
+	 * correct, though wasteful.
 	 */
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 	minoff = P_FIRSTDATAKEY(opaque);
@@ -99,18 +102,330 @@ _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
 		 */
 		if (PageGetFreeSpace(page) >= newitemsz)
 			return;
-
-		/*
-		 * Reconsider number of items on page, in case _bt_delitems_delete()
-		 * managed to delete an item or two
-		 */
-		minoff = P_FIRSTDATAKEY(opaque);
-		maxoff = PageGetMaxOffsetNumber(page);
 	}
 
+	minoff = maxoff = InvalidOffsetNumber;	/* Invalidate */
+
+	/*
+	 * We're willing to do dedup deletion with a unique index that is not
+	 * generally safe for deduplication (though only when deduplicate_items
+	 * storage param is not explicitly set to 'off', which our caller checks
+	 * for us).
+	 *
+	 * The logic used by the !checkingunique _bt_dedup_delete_one_page() case
+	 * relies on regular deduplication passes occurring, and merging together
+	 * index entries that point to distinct logical table rows that happen to
+	 * have the same key value (this might not happen immediately, but it
+	 * should happen before too long).  We're not willing to deduplicate when
+	 * the index isn't a unique index and isn't an index that is generally
+	 * safe for deduplication.  Exit early if we see that.
+	 */
+	if (!allequalimage && !checkingunique)
+		return;
+
 	/* Passed-in newitemsz is MAXALIGNED but does not include line pointer */
 	newitemsz += sizeof(ItemIdData);
 
+	if (checkingunique || !logicallymodified)
+	{
+		bool		dedupmerge = true;
+
+		/* Perform delete deduplication pass */
+		if (_bt_dedup_delete_one_page(rel, buf, heapRel, newitemsz,
+									  checkingunique, logicallymodified,
+									  &dedupmerge))
+			return;
+
+		/*
+		 * _bt_dedup_delete_one_page() may occasionally indicate no
+		 * duplicates, in which case we should give up now
+		 */
+		if (!dedupmerge)
+			return;
+
+		/* Fall back on merge deduplication.  This happens infrequently. */
+	}
+
+	/*
+	 * Perform merge deduplication pass, though only when index is
+	 * allequalimage -- otherwise it's not safe
+	 */
+	if (allequalimage)
+		_bt_dedup_merge_one_page(rel, buf, heapRel, newitem, newitemsz,
+								 checkingunique);
+}
+
+/*
+ * Perform a delete deduplication pass.
+ *
+ * See if duplicate index tuples are eligible to be deleted, even though they
+ * don't have their LP_DEAD bit set already.  Give up if we have to access
+ * more than a few heap pages before we can free enough space to fit newitem.
+ *
+ * Note: Caller should have already deleted all existing items with their
+ * LP_DEAD bits set.
+ *
+ * FIXME: Be less eager with tuples that contain NULLs for checkingunique
+ * callers, since NULLs can be duplicates without that signaling anything
+ * about version churn.  Just because we're checkingunique (which implies that
+ * incoming newitem isn't a NULL) doesn't mean there aren't lots of other
+ * NULLs on the page.
+ */
+static bool
+_bt_dedup_delete_one_page(Relation rel, Buffer buf, Relation heapRel,
+						  Size newitemsz, bool checkingunique,
+						  bool logicallymodified, bool *dedupmerge)
+{
+	OffsetNumber offnum,
+				minoff,
+				maxoff;
+	Size		freespace;
+	Page		page = BufferGetPage(buf);
+	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+	BTDedupState state;
+	TM_IndexDelete *duptids;
+	int			nduptids;
+	int			nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
+	state = (BTDedupState) palloc(sizeof(BTDedupStateData));
+	state->deduplicate = true;
+	state->nmaxitems = 0;
+	/* Final "posting list" size should not restrict anything */
+	state->maxpostingsize = BLCKSZ;
+	state->base = NULL;
+	state->baseoff = InvalidOffsetNumber;
+	state->basetupsize = 0;
+	state->htids = palloc(state->maxpostingsize);
+	state->nhtids = 0;
+	state->nitems = 0;
+	state->phystupsize = 0;
+	state->nintervals = 0;
+
+	minoff = P_FIRSTDATAKEY(opaque);
+	maxoff = PageGetMaxOffsetNumber(page);
+	for (offnum = minoff;
+		 offnum <= maxoff;
+		 offnum = OffsetNumberNext(offnum))
+	{
+		ItemId		itemid = PageGetItemId(page, offnum);
+		IndexTuple	itup = (IndexTuple) PageGetItem(page, itemid);
+
+		Assert(!ItemIdIsDead(itemid));
+
+		if (offnum == minoff)
+		{
+			_bt_dedup_start_pending(state, itup, offnum);
+		}
+		else if (_bt_keep_natts_fast(rel, state->base, itup) > nkeyatts &&
+				 _bt_dedup_save_htid(state, itup))
+		{
+
+		}
+		else
+		{
+			_bt_dedup_delete_finish_pending(state);
+
+			/* itup starts new pending posting list */
+			_bt_dedup_start_pending(state, itup, offnum);
+		}
+	}
+	/* Handle the final interval, which is still pending */
+	_bt_dedup_delete_finish_pending(state);
+
+	if (state->nintervals == 0)
+	{
+		/* No duplicates */
+		pfree(state->htids);
+		pfree(state);
+		/* Caller should avoid deduplication-by-merging pass */
+		*dedupmerge = false;
+		return false;
+	}
+
+	/*
+	 * Accumulate an array of duplicate TIDs to pass to heapam from dedup
+	 * intervals
+	 */
+	nduptids = 0;
+	duptids = palloc0(maxoff * sizeof(TM_IndexDelete));
+	for (int i = 0; i < state->nintervals; i++)
+	{
+		BTDedupInterval interval = state->intervals[i];
+
+		Assert(interval.nitems > 0);
+		/* Iterate through tuples of given interval/value */
+		for (int j = 0; j < interval.nitems; j++)
+		{
+			OffsetNumber dupoffnum = interval.baseoff + j;
+			ItemId		itemid = PageGetItemId(page, dupoffnum);
+			IndexTuple	itup = (IndexTuple) PageGetItem(page, itemid);
+
+			/*
+			 * Don't include any posting list tuples.  We still always count
+			 * plain non-pivot tuples that are the only non-pivot tuples
+			 * within the whole interval, though.
+			 *
+			 * The fact that a previous merge deduplication pass was ever able
+			 * to take place suggests that the TIDs mostly point to distinct
+			 * logical rows in the table.  We may have tried to delete these
+			 * same TIDs just before this merge pass, and if we did it's
+			 * unlikely that we'll have better luck with them now.  (It's also
+			 * quite possible that there was no previous delete deduplication
+			 * pass for this page at all, which should also discourage us from
+			 * including the posting list tuple.)
+			 *
+			 * XXX: It might make sense to do more here for checkingunique
+			 * callers.  Thoughts on that design:
+			 *
+			 * Benchmarking currently suggests that being more sophisticated
+			 * with unique index posting list tuples here wouldn't actually
+			 * make all that much difference.  It's something that we could do
+			 * when we get desperate, but how long can we really expect to
+			 * hold off a page split once things get that bad?  We ought to
+			 * make non-HOT UPDATEs "work hard" to prove that a page split
+			 * caused by version churn it truly necessary, but clearly it's
+			 * possible to go too far with that.
+			 *
+			 * Consider the extreme case.  We certainly don't want to make
+			 * non-HOT UPDATEs completely exhaust every possible avenue before
+			 * they may split a leaf page.  That hurts cases with buffer lock
+			 * contention way too much, and might even make the problem worse
+			 * indirectly by hindering moving the xmin horizon forward.
+			 * Version driven page splits are not _inherently_ a bad thing;
+			 * they still make sense as an extreme solution to an extreme (and
+			 * extremely rare) problem.
+			 */
+			if (BTreeTupleIsPosting(itup))
+				continue;
+
+			/* Save relevant index tuple info for tableam call */
+			duptids[nduptids].ioffnum = dupoffnum;
+			duptids[nduptids].tid = itup->t_tid;
+			nduptids++;
+		}
+	}
+
+	/* Done with dedup state */
+	pfree(state->htids);
+	pfree(state);
+
+	/* Record exact freespace left on page (incudes line pointer overhead) */
+	freespace = PageGetExactFreeSpace(page);
+
+	if (nduptids > 0)
+	{
+		SnapshotData SnapshotNonVacuumable;
+		OffsetNumber deletable[MaxIndexTuplesPerPage];
+		int			nheapblocksaccessed;
+		int			ntableamkills;
+		int			ndeletable = 0;
+
+		/*
+		 * Determine which TIDs are dead among dups.
+		 *
+		 * We aim to delete one eighth of the duplicates (or 5 total,
+		 * whichever is greater).  There would be a good chance of deleting as
+		 * many as half of the duplicates in some common scenarios if we were
+		 * eager, but being lazy is a better trade-off.
+		 */
+		InitNonVacuumableSnapshot(SnapshotNonVacuumable,
+								  GlobalVisTestFor(heapRel));
+		ntableamkills = table_index_batch_check(heapRel, duptids, nduptids,
+												&SnapshotNonVacuumable,
+												Max(nduptids / 8, 5),
+												&nheapblocksaccessed);
+
+		/*
+		 * Look through dups array, which probably has some items that we can
+		 * delete now.
+		 *
+		 * Note: The dups array is no longer in its original order (table am
+		 * sorted it based on its own criteria).  We don't care about the
+		 * order, though.
+		 */
+		for (int i = 0; i < nduptids; i++)
+		{
+			if (duptids[i].isdead)
+			{
+				OffsetNumber deadoffnum = duptids[i].ioffnum;
+				ItemId		itemid = PageGetItemId(page, deadoffnum);
+
+				/*
+				 * Delete item, and tally how much space will be saved when
+				 * we're done with the page.
+				 *
+				 * No MarkBufferDirtyHint() call needed -- we'll physically
+				 * delete item in a moment anyway, even when we know that we
+				 * won't have freed enough space to avoid a page split (must
+				 * not return before reaching _bt_delitems_delete()).
+				 *
+				 * (Actually, we don't really need to mark the ItemId dead
+				 * either, but we do so anyway because it's expected in
+				 * opportunistic deletion code called below.)
+				 */
+				ItemIdMarkDead(itemid);
+				deletable[ndeletable++] = deadoffnum;
+				freespace += MAXALIGN(ItemIdGetLength(itemid)) +
+							 sizeof(ItemIdData);
+			}
+
+			if (ntableamkills == ndeletable)
+				break;
+		}
+
+		Assert(ntableamkills == ndeletable);
+
+		if (ndeletable > 0)
+		{
+			/* Have to give array to _bt_delitems_delete in asc order */
+			qsort(deletable, ndeletable, sizeof(OffsetNumber),
+				  _bt_offsetnumbercmp);
+
+			/* Actually delete items */
+			_bt_delitems_delete(rel, buf, deletable, ndeletable, heapRel);
+		}
+	}
+
+	pfree(duptids);
+
+	/* Return success when page split (or merge deduplication pass) avoided */
+	Assert(freespace == PageGetExactFreeSpace(page));
+	return freespace >= newitemsz;
+}
+
+/*
+ * Perform a merge deduplication pass.
+ *
+ * The general approach taken here is to perform as much deduplication as
+ * possible to free as much space as possible.  Note, however, that "single
+ * value" strategy is sometimes used for !checkingunique callers, in which
+ * case deduplication will leave a few tuples untouched at the end of the
+ * page.  The general idea is to prepare the page for an anticipated page
+ * split that uses nbtsplitloc.c's "single value" strategy to determine a
+ * split point.  (There is no reason to deduplicate items that will end up on
+ * the right half of the page after the anticipated page split; better to
+ * handle those if and when the anticipated right half page gets its own
+ * deduplication pass, following further inserts of duplicates.)
+ *
+ * Note: Caller should have already deleted all existing items with their
+ * LP_DEAD bits set.
+ */
+static void
+_bt_dedup_merge_one_page(Relation rel, Buffer buf, Relation heapRel,
+						 IndexTuple newitem, Size newitemsz,
+						 bool checkingunique)
+{
+	OffsetNumber offnum,
+				minoff,
+				maxoff;
+	Page		page = BufferGetPage(buf);
+	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+	Page		newpage;
+	BTDedupState state;
+	Size		pagesaving = 0;
+	bool		singlevalstrat = false;
+	int			nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
 	/*
 	 * By here, it's clear that deduplication will definitely be attempted.
 	 * Initialize deduplication state.
@@ -138,6 +453,9 @@ _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
 	/* nintervals should be initialized to zero */
 	state->nintervals = 0;
 
+	minoff = P_FIRSTDATAKEY(opaque);
+	maxoff = PageGetMaxOffsetNumber(page);
+
 	/* Determine if "single value" strategy should be used */
 	if (!checkingunique)
 		singlevalstrat = _bt_do_singleval(rel, page, state, minoff, newitem);
@@ -203,7 +521,7 @@ _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
 			 * form new posting tuple, and actually update the page.  Else
 			 * reset the state and move on without modifying the page.
 			 */
-			pagesaving += _bt_dedup_finish_pending(newpage, state);
+			pagesaving += _bt_dedup_merge_finish_pending(newpage, state);
 
 			if (singlevalstrat)
 			{
@@ -235,7 +553,7 @@ _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
 	}
 
 	/* Handle the last item */
-	pagesaving += _bt_dedup_finish_pending(newpage, state);
+	pagesaving += _bt_dedup_merge_finish_pending(newpage, state);
 
 	/*
 	 * If no items suitable for deduplication were found, newpage must be
@@ -317,8 +635,8 @@ _bt_dedup_one_page(Relation rel, Buffer buf, Relation heapRel,
  * Every tuple processed by deduplication either becomes the base tuple for a
  * posting list, or gets its heap TID(s) accepted into a pending posting list.
  * A tuple that starts out as the base tuple for a posting list will only
- * actually be rewritten within _bt_dedup_finish_pending() when it turns out
- * that there are duplicates that can be merged into the base tuple.
+ * actually be rewritten within _bt_dedup_merge_finish_pending() when it turns
+ * out that there are duplicates that can be merged into the base tuple.
  */
 void
 _bt_dedup_start_pending(BTDedupState state, IndexTuple base,
@@ -443,7 +761,7 @@ _bt_dedup_save_htid(BTDedupState state, IndexTuple itup)
  * where no deduplication was possible.
  */
 Size
-_bt_dedup_finish_pending(Page newpage, BTDedupState state)
+_bt_dedup_merge_finish_pending(Page newpage, BTDedupState state)
 {
 	OffsetNumber tupoff;
 	Size		tuplesz;
@@ -496,6 +814,38 @@ _bt_dedup_finish_pending(Page newpage, BTDedupState state)
 	return spacesaving;
 }
 
+/*
+ * Stripped down version of _bt_dedup_merge_finish_pending() used by
+ * _bt_dedup_delete_one_page().
+ *
+ * Finalize deduplication interval/duplicate group without materializing the
+ * would-be posting list tuple.
+ */
+static void
+_bt_dedup_delete_finish_pending(BTDedupState state)
+{
+	Assert(state->nitems > 0);
+	Assert(state->nitems <= state->nhtids);
+	Assert(state->intervals[state->nintervals].baseoff == state->baseoff);
+
+	if (state->nitems == 1)
+	{
+		/* Don't merge */
+	}
+	else
+	{
+		/* Save final number of items for posting list */
+		state->intervals[state->nintervals].nitems = state->nitems;
+		/* Increment nintervals, since we wrote a new posting list tuple */
+		state->nintervals++;
+	}
+
+	/* Reset state for next pending posting list */
+	state->nhtids = 0;
+	state->nitems = 0;
+	state->phystupsize = 0;
+}
+
 /*
  * Determine if page non-pivot tuples (data items) are all duplicates of the
  * same value -- if they are, deduplication's "single value" strategy should
@@ -809,6 +1159,25 @@ _bt_swap_posting(IndexTuple newitem, IndexTuple oposting, int postingoff)
 	return nposting;
 }
 
+/*
+ * qsort-style comparator used by _bt_dedup_delete_one_page()
+ */
+static int
+_bt_offsetnumbercmp(const void *arg1, const void *arg2)
+{
+	OffsetNumber *inter1 = (OffsetNumber *) arg1;
+	OffsetNumber *inter2 = (OffsetNumber *) arg2;
+
+	if (*inter1 > *inter2)
+		return 1;
+	if (*inter1 < *inter2)
+		return -1;
+
+	Assert(false);
+
+	return 0;
+}
+
 /*
  * Verify posting list invariants for "posting", which must be a posting list
  * tuple.  Used within assertions.
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index d36f7557c8..af5aa849ef 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -37,6 +37,7 @@ static TransactionId _bt_check_unique(Relation rel, BTInsertState insertstate,
 static OffsetNumber _bt_findinsertloc(Relation rel,
 									  BTInsertState insertstate,
 									  bool checkingunique,
+									  bool logicallymodified,
 									  BTStack stack,
 									  Relation heapRel);
 static void _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack);
@@ -86,7 +87,9 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 	BTInsertStateData insertstate;
 	BTScanInsert itup_key;
 	BTStack		stack;
-	bool		checkingunique = (checkUnique != UNIQUE_CHECK_NO);
+	bool		checkingunique = (checkUnique != UNIQUE_CHECK_NO &&
+								  checkUnique != UNIQUE_CHECK_NO_WITH_UNCHANGED);
+	bool		logicallymodified = (checkUnique != UNIQUE_CHECK_NO_WITH_UNCHANGED);
 
 	/* we need an insertion scan key to do our search, so build one */
 	itup_key = _bt_mkscankey(rel, itup);
@@ -235,7 +238,7 @@ search:
 		 * checkingunique.
 		 */
 		newitemoff = _bt_findinsertloc(rel, &insertstate, checkingunique,
-									   stack, heapRel);
+									   logicallymodified, stack, heapRel);
 		_bt_insertonpg(rel, itup_key, insertstate.buf, InvalidBuffer, stack,
 					   itup, insertstate.itemsz, newitemoff,
 					   insertstate.postingoff, false);
@@ -767,6 +770,11 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
  *		the right, rather than the first page.  In that case, this function
  *		moves right to the correct target page.
  *
+ *		If 'logicallymodified' is false, this is for an UPDATE that didn't
+ *		logically change the indexed value, but must nevertheless have a new
+ *		entry to point to a successor version.  This hint from the executor
+ *		influences the behavior of deduplication.
+ *
  *		(In a !heapkeyspace index, there can be multiple pages with the same
  *		high key, where the new tuple could legitimately be placed on.  In
  *		that case, the caller passes the first page containing duplicates,
@@ -790,6 +798,7 @@ static OffsetNumber
 _bt_findinsertloc(Relation rel,
 				  BTInsertState insertstate,
 				  bool checkingunique,
+				  bool logicallymodified,
 				  BTStack stack,
 				  Relation heapRel)
 {
@@ -873,14 +882,21 @@ _bt_findinsertloc(Relation rel,
 		/*
 		 * If the target page is full, see if we can obtain enough space by
 		 * erasing LP_DEAD items.  If that fails to free enough space, see if
-		 * we can avoid a page split by performing a deduplication pass over
-		 * the page.
+		 * we can avoid a page split by performing deduplication.  Usually
+		 * this means a deduplication merge pass, though a deduplication
+		 * delete pass is preferred when it looks like version churn is the
+		 * source of most of the duplicates (nbtdedup.c decides which approach
+		 * to take based on the checkingunique and logicallymodified flags).
 		 *
-		 * We only perform a deduplication pass for a checkingunique caller
-		 * when the incoming item is a duplicate of an existing item on the
-		 * leaf page.  This heuristic avoids wasting cycles -- we only expect
-		 * to benefit from deduplicating a unique index page when most or all
-		 * recently added items are duplicates.  See nbtree/README.
+		 * We only consider deduplication for a checkingunique caller when the
+		 * incoming item is a known duplicate of an existing item on the leaf
+		 * page.  This heuristic avoids wasting cycles.  The overarching goal
+		 * within a unique index is to prevent an unnecessary page split
+		 * altogether by delaying splits again and again (the goal is not to
+		 * save space).  If even one incoming tuple that gets added to this
+		 * page originates with an INSERT statement then a page split is all
+		 * but inevitable anyway --- that's why it's okay that our heuristic
+		 * only considers the current incoming newitem.  See nbtree/README.
 		 */
 		if (PageGetFreeSpace(page) < insertstate->itemsz)
 		{
@@ -893,13 +909,13 @@ _bt_findinsertloc(Relation rel,
 				uniquedup = true;
 			}
 
-			if (itup_key->allequalimage && BTGetDeduplicateItems(rel) &&
-				(!checkingunique || uniquedup) &&
+			if (BTGetDeduplicateItems(rel) && (!checkingunique || uniquedup) &&
 				PageGetFreeSpace(page) < insertstate->itemsz)
 			{
 				_bt_dedup_one_page(rel, insertstate->buf, heapRel,
 								   insertstate->itup, insertstate->itemsz,
-								   checkingunique);
+								   checkingunique, logicallymodified,
+								   itup_key->allequalimage);
 				insertstate->bounds_valid = false;
 			}
 		}
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index efee86784b..ecfe79badb 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -273,7 +273,7 @@ static void _bt_sortaddtup(Page page, Size itemsize,
 						   bool newfirstdataitem);
 static void _bt_buildadd(BTWriteState *wstate, BTPageState *state,
 						 IndexTuple itup, Size truncextra);
-static void _bt_sort_dedup_finish_pending(BTWriteState *wstate,
+static void _bt_dedup_sort_finish_pending(BTWriteState *wstate,
 										  BTPageState *state,
 										  BTDedupState dstate);
 static void _bt_uppershutdown(BTWriteState *wstate, BTPageState *state);
@@ -1068,11 +1068,11 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
  * Finalize pending posting list tuple, and add it to the index.  Final tuple
  * is based on saved base tuple, and saved list of heap TIDs.
  *
- * This is almost like _bt_dedup_finish_pending(), but it adds a new tuple
- * using _bt_buildadd().
+ * This is almost like _bt_dedup_merge_finish_pending(), but it adds a new
+ * tuple using _bt_buildadd().
  */
 static void
-_bt_sort_dedup_finish_pending(BTWriteState *wstate, BTPageState *state,
+_bt_dedup_sort_finish_pending(BTWriteState *wstate, BTPageState *state,
 							  BTDedupState dstate)
 {
 	Assert(dstate->nitems > 0);
@@ -1371,7 +1371,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 				 * _bt_dedup_save_htid() opted to not merge current item into
 				 * pending posting list.
 				 */
-				_bt_sort_dedup_finish_pending(wstate, state, dstate);
+				_bt_dedup_sort_finish_pending(wstate, state, dstate);
 				pfree(dstate->base);
 
 				/* start new pending posting list with itup copy */
@@ -1390,7 +1390,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 			 * Handle the last item (there must be a last item when the
 			 * tuplesort returned one or more tuples)
 			 */
-			_bt_sort_dedup_finish_pending(wstate, state, dstate);
+			_bt_dedup_sort_finish_pending(wstate, state, dstate);
 			pfree(dstate->base);
 			pfree(dstate->htids);
 		}
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index bda9be2348..9186bdeea5 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -530,12 +530,12 @@ btree_xlog_dedup(XLogReaderState *record)
 			}
 			else
 			{
-				_bt_dedup_finish_pending(newpage, state);
+				_bt_dedup_merge_finish_pending(newpage, state);
 				_bt_dedup_start_pending(state, itup, offnum);
 			}
 		}
 
-		_bt_dedup_finish_pending(newpage, state);
+		_bt_dedup_merge_finish_pending(newpage, state);
 		Assert(state->nintervals == xlrec->nintervals);
 		Assert(memcmp(state->intervals, intervals,
 					  state->nintervals * sizeof(BTDedupInterval)) == 0);
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 6438c45716..abe69db84e 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -30,6 +30,11 @@
 #include "storage/shmem.h"
 #include "storage/smgr.h"
 
+static void table_index_batch_check_block_count_sort(TM_IndexDelete *duptids,
+													 int nduptids);
+static int	indexdelete_tids_cmp(const void *arg1, const void *arg2);
+static int	indexdeletecount_ntids_cmp(const void *arg1, const void *arg2);
+
 /*
  * Constants to control the behavior of block allocation to parallel workers
  * during a parallel seqscan.  Technically these values do not need to be
@@ -207,9 +212,9 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc parallel_scan)
 /*
  * To perform that check simply start an index scan, create the necessary
  * slot, do the heap lookup, and shut everything down again. This could be
- * optimized, but is unlikely to matter from a performance POV. If there
- * frequently are live index pointers also matching a unique index key, the
- * CPU overhead of this routine is unlikely to matter.
+ * optimized, but is unlikely to matter from a performance POV. Note that
+ * table_index_batch_check() is optimized in this way, since it is designed
+ * as a batch operation.
  *
  * Note that *tid may be modified when we return true if the AM supports
  * storing multiple row versions reachable via a single index entry (like
@@ -236,6 +241,112 @@ table_index_fetch_tuple_check(Relation rel,
 	return found;
 }
 
+/*
+ * Specialized variant of table_index_fetch_tuple_check() that can be used
+ * by index AMs to perform "bottom up" deletion of duplicate index tuples.
+ * This is particularly likely to work well with unique indexes.
+ *
+ * Note: This routine sorts the duptids array, but does not modify any
+ * individual entry accept to mark it as dead for caller.
+ *
+ * Returns total number of duptids that can be killed in index by caller.
+ *
+ * TODO: This should be combined with the equivalent of a call to
+ * table_compute_xid_horizon_for_tuples().
+ */
+int
+table_index_batch_check(Relation rel, TM_IndexDelete *duptids, int nduptids,
+						Snapshot snapshot, int nkillsneeded,
+						int *nblocksaccessed)
+{
+	IndexFetchTableData *scan;
+	TupleTableSlot *slot;
+	int			nkills = 0;
+	BlockNumber last = InvalidBlockNumber;
+	bool		final_block = false;
+
+	slot = table_slot_create(rel, NULL);
+	scan = table_index_fetch_begin(rel);
+
+	*nblocksaccessed = 0;
+	table_index_batch_check_block_count_sort(duptids, nduptids);
+	for (int i = 0; i < nduptids; i++)
+	{
+		ItemPointer tid = &(duptids + i)->tid;
+		bool		new_block = last != ItemPointerGetBlockNumber(tid);
+		ItemPointerData tmp;
+		bool		call_again = false;
+		bool		all_dead = false;
+		bool		found;
+
+		Assert(!duptids[i].isdead);
+
+		/*
+		 * Never access more than 5 blocks, no matter what.
+		 */
+		if (new_block && *nblocksaccessed >= 5)
+			break;
+
+		/*
+		 * New block encountered, but last block we processed is supposed to
+		 * be final block.  Quit now -- don't access this new block at all.
+		 */
+		if (new_block && final_block)
+			break;
+
+		/*
+		 * Quit when we're about to access a third table block and have no
+		 * kills to show for accessing the first two.
+		 */
+		if (new_block && *nblocksaccessed >= 2 && nkills == 0)
+			break;
+
+		/*
+		 * Lower the bar when we've already accessed 3 blocks, and would
+		 * otherwise access a fourth now -- quit when we have only had 3+
+		 * kills
+		 */
+		if (new_block && *nblocksaccessed >= 3 && nkills >= 3)
+			break;
+
+		tmp = *tid;
+		found = table_index_fetch_tuple(scan, &tmp, snapshot, slot,
+										&call_again, &all_dead);
+
+		if (new_block)
+			(*nblocksaccessed)++;
+		last = ItemPointerGetBlockNumber(tid);
+		if (!found && all_dead)
+		{
+			duptids[i].isdead = true;
+			nkills++;
+		}
+
+		if (nkills >= nkillsneeded)
+		{
+			/*
+			 * Caller is satisfied, so we can quit now.  But before we do,
+			 * might as well finish off remaining TIDs on same table page (if
+			 * any).  Indicate that the current block we're processing is the
+			 * final one we intend to process.
+			 *
+			 * We only quit when we've accessed at least two blocks already
+			 * (at least within a typical identity column style primary key
+			 * with version churn).  It's not uncommon for us to find all the
+			 * kills that caller needs having only accessed one table block,
+			 * but when that happens it seems like a good idea to be ambitious
+			 * about finding more tuples to kill.
+			 */
+			if (*nblocksaccessed >= 2)
+				final_block = true;
+		}
+	}
+
+	table_index_fetch_end(scan);
+	ExecDropSingleTupleTableSlot(slot);
+
+	return nkills;
+}
 
 /* ------------------------------------------------------------------------
  * Functions for non-modifying operations on individual tuples
@@ -356,7 +467,7 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, &lockmode, update_indexes);
+								&tmfd, &lockmode, update_indexes, NULL);
 
 	switch (result)
 	{
@@ -763,3 +874,127 @@ table_block_relation_estimate_size(Relation rel, int32 *attr_widths,
 	else
 		*allvisfrac = (double) relallvisible / curpages;
 }
+
+typedef struct TM_IndexDeleteCounts
+{
+	BlockNumber table_block;
+	int			ntids_in_block;
+} TM_IndexDeleteCounts;
+
+/*
+ * table_index_batch_check() requires that the duptids array be in a certain
+ * order before it gets started.  This helper routine handles that.
+ *
+ * TIDs are grouped together by block number, with ascending TID order within
+ * each group (i.e. in ascending TID offset number order).  The block number
+ * groups are ordered according to the total number of candidate TIDs.  This
+ * order maximizes the final number of TIDs that caller can kill in index
+ * relative to the number of tableam blocks accessed.
+ *
+ * The goal of the sort order is to process as many dup table TIDs as
+ * possible with as few table buffer accesses as possible.  In practice it's
+ * frequently possible to kill relatively many TIDs with only one or two
+ * table page accesses due to the effect of locality.
+ */
+static void
+table_index_batch_check_block_count_sort(TM_IndexDelete *duptids, int nduptids)
+{
+	TM_IndexDeleteCounts *blockcounts;
+	TM_IndexDelete *reorderedduptids;
+	BlockNumber curblock = InvalidBlockNumber;
+	int			nblock_groups = 0;
+	int			ncopied = 0;
+
+	Assert(nduptids > 0);
+
+	/* First sort caller's array by TID */
+	qsort(duptids, nduptids, sizeof(TM_IndexDelete), indexdelete_tids_cmp);
+
+	/* Calculate per-table-block count of TIDs */
+	blockcounts = palloc(sizeof(TM_IndexDeleteCounts) * nduptids);
+	for (int i = 0; i < nduptids; i++)
+	{
+		ItemPointer duptid = &duptids[i].tid;
+
+		if (curblock != ItemPointerGetBlockNumber(duptid))
+		{
+			/* New block group */
+			nblock_groups++;
+
+			curblock = ItemPointerGetBlockNumber(duptid);
+			blockcounts[nblock_groups - 1].table_block = curblock;
+			blockcounts[nblock_groups - 1].ntids_in_block = 1;
+		}
+		else
+		{
+			blockcounts[nblock_groups - 1].ntids_in_block++;
+		}
+	}
+
+	/* Sort blockcounts by count in desc order, then tiebreak on block number */
+	qsort(blockcounts, nblock_groups, sizeof(TM_IndexDeleteCounts),
+		  indexdeletecount_ntids_cmp);
+	reorderedduptids = palloc0(nduptids * sizeof(TM_IndexDelete));
+	for (int i = 0; i < nblock_groups; i++)
+	{
+		TM_IndexDeleteCounts *blockgroup = blockcounts + i;
+
+		for (int j = 0; j < nduptids; j++)
+		{
+			ItemPointer tid = &duptids[j].tid;
+
+			if (blockgroup->table_block == ItemPointerGetBlockNumber(tid))
+			{
+				memcpy(reorderedduptids + ncopied, duptids + j,
+					   sizeof(TM_IndexDelete) * blockgroup->ntids_in_block);
+				ncopied += blockgroup->ntids_in_block;
+				break;			/* Move on to next table block group */
+			}
+		}
+	}
+
+	/* Copy back final sorted array into caller's array */
+	memcpy(duptids, reorderedduptids, sizeof(TM_IndexDelete) * nduptids);
+
+	/* be tidy */
+	pfree(reorderedduptids);
+	pfree(blockcounts);
+}
+
+/*
+ * qsort-style comparator used in table_index_batch_check_block_count_sort()
+ */
+static int
+indexdelete_tids_cmp(const void *arg1, const void *arg2)
+{
+	TM_IndexDelete *indexdelete1 = (TM_IndexDelete *) arg1;
+	TM_IndexDelete *indexdelete2 = (TM_IndexDelete *) arg2;
+
+	return ItemPointerCompare(&indexdelete1->tid, &indexdelete2->tid);
+}
+
+/*
+ * qsort-style comparator used in table_index_batch_check_block_count_sort()
+ */
+static int
+indexdeletecount_ntids_cmp(const void *arg1, const void *arg2)
+{
+	TM_IndexDeleteCounts *count1 = (TM_IndexDeleteCounts *) arg1;
+	TM_IndexDeleteCounts *count2 = (TM_IndexDeleteCounts *) arg2;
+
+	/* Invert usual order here to get desc ntids_in_block sort order */
+	if (count1->ntids_in_block > count2->ntids_in_block)
+		return -1;
+	if (count1->ntids_in_block < count2->ntids_in_block)
+		return 1;
+
+	/* Tiebreak on block number (this is asc order) */
+	if (count1->table_block > count2->table_block)
+		return 1;
+	if (count1->table_block < count2->table_block)
+		return -1;
+
+	Assert(false);
+
+	return 0;
+}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 531bd7c73a..b9b625b883 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2523,7 +2523,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 			recheckIndexes =
 				ExecInsertIndexTuples(resultRelInfo,
 									  buffer->slots[i], estate, false, NULL,
-									  NIL);
+									  NIL, NULL);
 			ExecARInsertTriggers(estate, resultRelInfo,
 								 slots[i], recheckIndexes,
 								 cstate->transition_capture);
@@ -3285,7 +3285,8 @@ CopyFrom(CopyState cstate)
 																   estate,
 																   false,
 																   NULL,
-																   NIL);
+																   NIL,
+																   NULL);
 					}
 
 					/* AFTER ROW INSERT Triggers */
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index c6b5bcba7b..d171d26b69 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -275,7 +275,8 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
 					  EState *estate,
 					  bool noDupErr,
 					  bool *specConflict,
-					  List *arbiterIndexes)
+					  List *arbiterIndexes,
+					  Bitmapset *modified_attrs_hint)
 {
 	ItemPointer tupleid = &slot->tts_tid;
 	List	   *result = NIL;
@@ -389,6 +390,44 @@ ExecInsertIndexTuples(ResultRelInfo *resultRelInfo,
 		else
 			checkUnique = UNIQUE_CHECK_PARTIAL;
 
+		/*
+		 * We may have to hint to index am that this is a logically unchanged
+		 * index tuple.  This happens when we're inserting a duplicate tuple
+		 * just to represent the successor version.
+		 */
+		if (checkUnique == UNIQUE_CHECK_NO && modified_attrs_hint)
+		{
+			bool		logicallyModified = false;
+
+			for (int attr = 0; attr < indexInfo->ii_NumIndexAttrs; attr++)
+			{
+				int			keycol = indexInfo->ii_IndexAttrNumbers[attr];
+
+				if (keycol > 0)
+				{
+					logicallyModified =
+						bms_is_member(keycol - FirstLowInvalidHeapAttributeNumber,
+									  modified_attrs_hint);
+					if (logicallyModified)
+						break;
+				}
+				else
+				{
+					/*
+					 * XXX: For now we always assume that expression indexes
+					 * and indexes with whole-row vars were not modified by an
+					 * UPDATE (i.e. they just use the dedup delete
+					 * optimization regardless of the details of the UPDATE).
+					 * Review this decision when the high level design is a
+					 * bit better worked out.
+					 */
+				}
+			}
+
+			if (!logicallyModified)
+				checkUnique = UNIQUE_CHECK_NO_WITH_UNCHANGED;
+		}
+
 		satisfiesConstraint =
 			index_insert(indexRelation, /* index relation */
 						 values,	/* array of index Datums */
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 01d26881e7..e97d05b448 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -445,7 +445,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 		if (resultRelInfo->ri_NumIndices > 0)
 			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 												   slot, estate, false, NULL,
-												   NIL);
+												   NIL, NULL);
 
 		/* AFTER ROW INSERT Triggers */
 		ExecARInsertTriggers(estate, resultRelInfo, slot,
@@ -513,7 +513,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 		if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
 			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 												   slot, estate, false, NULL,
-												   NIL);
+												   NIL, NULL);
 
 		/* AFTER ROW UPDATE Triggers */
 		ExecARUpdateTriggers(estate, resultRelInfo,
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 0c055ed408..a522c952e5 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -605,7 +605,7 @@ ExecInsert(ModifyTableState *mtstate,
 			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 												   slot, estate, true,
 												   &specConflict,
-												   arbiterIndexes);
+												   arbiterIndexes, NULL);
 
 			/* adjust the tuple's state accordingly */
 			table_tuple_complete_speculative(resultRelationDesc, slot,
@@ -644,7 +644,7 @@ ExecInsert(ModifyTableState *mtstate,
 			if (resultRelInfo->ri_NumIndices > 0)
 				recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 													   slot, estate, false,
-													   NULL, NIL);
+													   NULL, NIL, NULL);
 		}
 	}
 
@@ -1238,6 +1238,7 @@ ExecUpdate(ModifyTableState *mtstate,
 	TM_Result	result;
 	TM_FailureData tmfd;
 	List	   *recheckIndexes = NIL;
+	Bitmapset  *modified_attrs_hint = NULL;
 
 	/*
 	 * abort the operation if not running transactions
@@ -1401,7 +1402,8 @@ lreplace:;
 									estate->es_snapshot,
 									estate->es_crosscheck_snapshot,
 									true /* wait for commit */ ,
-									&tmfd, &lockmode, &update_indexes);
+									&tmfd, &lockmode, &update_indexes,
+									&modified_attrs_hint);
 
 		switch (result)
 		{
@@ -1534,7 +1536,8 @@ lreplace:;
 		if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
 			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
 												   slot, estate, false,
-												   NULL, NIL);
+												   NULL, NIL,
+												   modified_attrs_hint);
 	}
 
 	if (canSetTag)
-- 
2.25.1

