From 1d3dc0c37714f7d9ba1f168e0dbc54578fab3c88 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 4 Oct 2019 14:24:28 -0400
Subject: [PATCH v7] Allow TOAST tables to be implemented using table AMs other
 than heap.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

toast_fetch_datum, toast_save_datum, and toast_delete_datum are
adjusted to use tableam rather than heap-specific functions.  This
might have some performance impact, but this patch attempts to
mitigate that by restructuring things so that we don't open and close
the toast table and indexes multiple times per tuple.

tableam now exposes an integer value (not a callback) for the
maximum TOAST chunk size, and has a new callback allowing table
AMs to specify the AM that should be used to implement the TOAST
table. Previously, the toast AM was always the same as the table AM.

Patch by me, reviewed and tested by Prabhat Sabu, Thomas Munro,
Andres Freund, and Álvaro Herrera.

Discussion: http://postgr.es/m/CA+TgmoZv-=2iWM4jcw5ZhJeL18HF96+W1yJeYrnGMYdkFFnEpQ@mail.gmail.com
---
 src/backend/access/common/detoast.c         |  62 +++++-----
 src/backend/access/common/toast_internals.c | 127 +++++++-------------
 src/backend/access/heap/heapam.c            |   6 +-
 src/backend/access/heap/heapam_handler.c    |  14 ++-
 src/backend/access/heap/heaptoast.c         |  19 ++-
 src/backend/access/index/genam.c            |  20 +++
 src/backend/access/table/toast_helper.c     | 107 ++++++++++++++---
 src/backend/catalog/toasting.c              |   2 +-
 src/include/access/genam.h                  |   5 +-
 src/include/access/heapam.h                 |   3 +-
 src/include/access/heaptoast.h              |   2 +-
 src/include/access/tableam.h                |  31 +++++
 src/include/access/toast_helper.h           |  18 ++-
 src/include/access/toast_internals.h        |  15 ++-
 14 files changed, 283 insertions(+), 148 deletions(-)

diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
index b25ca6810b..8550587c08 100644
--- a/src/backend/access/common/detoast.c
+++ b/src/backend/access/common/detoast.c
@@ -15,10 +15,11 @@
 
 #include "access/detoast.h"
 #include "access/genam.h"
-#include "access/heaptoast.h"
 #include "access/table.h"
 #include "access/toast_internals.h"
+#include "access/tableam.h"
 #include "common/pg_lzcompress.h"
+#include "executor/tuptable.h"
 #include "utils/expandeddatum.h"
 #include "utils/fmgroids.h"
 #include "utils/rel.h"
@@ -327,8 +328,7 @@ toast_fetch_datum(struct varlena *attr)
 	Relation   *toastidxs;
 	ScanKeyData toastkey;
 	SysScanDesc toastscan;
-	HeapTuple	ttup;
-	TupleDesc	toasttupDesc;
+	TupleTableSlot *slot;
 	struct varlena *result;
 	struct varatt_external toast_pointer;
 	int32		ressize;
@@ -336,11 +336,11 @@ toast_fetch_datum(struct varlena *attr)
 				nextidx;
 	int32		numchunks;
 	Pointer		chunk;
-	bool		isnull;
 	char	   *chunkdata;
 	int32		chunksize;
 	int			num_indexes;
 	int			validIndex;
+	int			max_chunk_size;
 	SnapshotData SnapshotToast;
 
 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
@@ -350,7 +350,6 @@ toast_fetch_datum(struct varlena *attr)
 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 
 	ressize = toast_pointer.va_extsize;
-	numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
 
 	result = (struct varlena *) palloc(ressize + VARHDRSZ);
 
@@ -363,7 +362,9 @@ toast_fetch_datum(struct varlena *attr)
 	 * Open the toast relation and its indexes
 	 */
 	toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
-	toasttupDesc = toastrel->rd_att;
+
+	max_chunk_size = toastrel->rd_tableam->toast_max_chunk_size;
+	numchunks = ((ressize - 1) / max_chunk_size) + 1;
 
 	/* Look for the valid index of the toast relation */
 	validIndex = toast_open_indexes(toastrel,
@@ -391,15 +392,15 @@ toast_fetch_datum(struct varlena *attr)
 	init_toast_snapshot(&SnapshotToast);
 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
 										   &SnapshotToast, 1, &toastkey);
-	while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
+	while ((slot = systable_getnextslot_ordered(toastscan, ForwardScanDirection)) != NULL)
 	{
 		/*
 		 * Have a chunk, extract the sequence number and the data
 		 */
-		residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
-		Assert(!isnull);
-		chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
-		Assert(!isnull);
+		slot_getsomeattrs(slot, 3);
+		Assert(!slot->tts_isnull[1] && !slot->tts_isnull[2]);
+		residx = DatumGetInt32(slot->tts_values[1]);
+		chunk = DatumGetPointer(slot->tts_values[2]);
 		if (!VARATT_IS_EXTENDED(chunk))
 		{
 			chunksize = VARSIZE(chunk) - VARHDRSZ;
@@ -433,23 +434,23 @@ toast_fetch_datum(struct varlena *attr)
 									 RelationGetRelationName(toastrel))));
 		if (residx < numchunks - 1)
 		{
-			if (chunksize != TOAST_MAX_CHUNK_SIZE)
+			if (chunksize != max_chunk_size)
 				ereport(ERROR,
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s",
-										 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
+										 chunksize, max_chunk_size,
 										 residx, numchunks,
 										 toast_pointer.va_valueid,
 										 RelationGetRelationName(toastrel))));
 		}
 		else if (residx == numchunks - 1)
 		{
-			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != ressize)
+			if ((residx * max_chunk_size + chunksize) != ressize)
 				ereport(ERROR,
 						(errcode(ERRCODE_DATA_CORRUPTED),
 						 errmsg_internal("unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s",
 										 chunksize,
-										 (int) (ressize - residx * TOAST_MAX_CHUNK_SIZE),
+										 (int) (ressize - residx * max_chunk_size),
 										 residx,
 										 toast_pointer.va_valueid,
 										 RelationGetRelationName(toastrel))));
@@ -466,7 +467,7 @@ toast_fetch_datum(struct varlena *attr)
 		/*
 		 * Copy the data into proper place in our result
 		 */
-		memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
+		memcpy(VARDATA(result) + residx * max_chunk_size,
 			   chunkdata,
 			   chunksize);
 
@@ -534,6 +535,7 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
 	int32		chcpyend;
 	int			num_indexes;
 	int			validIndex;
+	int			max_chunk_size;
 	SnapshotData SnapshotToast;
 
 	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
@@ -550,7 +552,6 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
 	Assert(!VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer) || 0 == sliceoffset);
 
 	attrsize = toast_pointer.va_extsize;
-	totalchunks = ((attrsize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
 
 	if (sliceoffset >= attrsize)
 	{
@@ -579,19 +580,22 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
 	if (length == 0)
 		return result;			/* Can save a lot of work at this point! */
 
-	startchunk = sliceoffset / TOAST_MAX_CHUNK_SIZE;
-	endchunk = (sliceoffset + length - 1) / TOAST_MAX_CHUNK_SIZE;
-	numchunks = (endchunk - startchunk) + 1;
-
-	startoffset = sliceoffset % TOAST_MAX_CHUNK_SIZE;
-	endoffset = (sliceoffset + length - 1) % TOAST_MAX_CHUNK_SIZE;
-
 	/*
 	 * Open the toast relation and its indexes
 	 */
 	toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
 	toasttupDesc = toastrel->rd_att;
 
+	max_chunk_size = toastrel->rd_tableam->toast_max_chunk_size;
+	totalchunks = ((attrsize - 1) / max_chunk_size) + 1;
+
+	startchunk = sliceoffset / max_chunk_size;
+	endchunk = (sliceoffset + length - 1) / max_chunk_size;
+	numchunks = (endchunk - startchunk) + 1;
+
+	startoffset = sliceoffset % max_chunk_size;
+	endoffset = (sliceoffset + length - 1) % max_chunk_size;
+
 	/* Look for the valid index of toast relation */
 	validIndex = toast_open_indexes(toastrel,
 									AccessShareLock,
@@ -680,19 +684,19 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
 				 RelationGetRelationName(toastrel));
 		if (residx < totalchunks - 1)
 		{
-			if (chunksize != TOAST_MAX_CHUNK_SIZE)
+			if (chunksize != max_chunk_size)
 				elog(ERROR, "unexpected chunk size %d (expected %d) in chunk %d of %d for toast value %u in %s when fetching slice",
-					 chunksize, (int) TOAST_MAX_CHUNK_SIZE,
+					 chunksize, max_chunk_size,
 					 residx, totalchunks,
 					 toast_pointer.va_valueid,
 					 RelationGetRelationName(toastrel));
 		}
 		else if (residx == totalchunks - 1)
 		{
-			if ((residx * TOAST_MAX_CHUNK_SIZE + chunksize) != attrsize)
+			if ((residx * max_chunk_size + chunksize) != attrsize)
 				elog(ERROR, "unexpected chunk size %d (expected %d) in final chunk %d for toast value %u in %s when fetching slice",
 					 chunksize,
-					 (int) (attrsize - residx * TOAST_MAX_CHUNK_SIZE),
+					 (int) (attrsize - residx * max_chunk_size),
 					 residx,
 					 toast_pointer.va_valueid,
 					 RelationGetRelationName(toastrel));
@@ -715,7 +719,7 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset, int32 length)
 			chcpyend = endoffset;
 
 		memcpy(VARDATA(result) +
-			   (residx * TOAST_MAX_CHUNK_SIZE - sliceoffset) + chcpystrt,
+			   (residx * max_chunk_size - sliceoffset) + chcpystrt,
 			   chunkdata + chcpystrt,
 			   (chcpyend - chcpystrt) + 1);
 
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index a971242490..beb303034d 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -15,9 +15,8 @@
 
 #include "access/detoast.h"
 #include "access/genam.h"
-#include "access/heapam.h"
-#include "access/heaptoast.h"
 #include "access/table.h"
+#include "access/tableam.h"
 #include "access/toast_internals.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
@@ -100,22 +99,21 @@ toast_compress_datum(Datum value)
  *	Save one single datum into the secondary relation and return
  *	a Datum reference for it.
  *
- * rel: the main relation we're working with (not the toast rel!)
+ * toastrel: the TOAST relation we're working with (not the main rel!)
+ * toastslot: a slot corresponding to 'toastrel'
+ * num_indexes, toastidxs, validIndex: as returned by toast_open_indexes
+ * toastoid: the toast OID that should be inserted into the new TOAST pointer
  * value: datum to be pushed to toast storage
  * oldexternal: if not NULL, toast pointer previously representing the datum
- * options: options to be passed to heap_insert() for toast rows
+ * options: options to be passed to table_tuple_insert() for toast rows
  * ----------
  */
 Datum
-toast_save_datum(Relation rel, Datum value,
-				 struct varlena *oldexternal, int options)
+toast_save_datum(Relation toastrel, TupleTableSlot *toastslot,
+				 int num_indexes, Relation *toastidxs, int validIndex,
+				 Oid toastoid, Datum value, struct varlena *oldexternal,
+				 int options, int max_chunk_size)
 {
-	Relation	toastrel;
-	Relation   *toastidxs;
-	HeapTuple	toasttup;
-	TupleDesc	toasttupDesc;
-	Datum		t_values[3];
-	bool		t_isnull[3];
 	CommandId	mycid = GetCurrentCommandId(true);
 	struct varlena *result;
 	struct varatt_external toast_pointer;
@@ -123,7 +121,7 @@ toast_save_datum(Relation rel, Datum value,
 	{
 		struct varlena hdr;
 		/* this is to make the union big enough for a chunk: */
-		char		data[TOAST_MAX_CHUNK_SIZE + VARHDRSZ];
+		char		data[BLCKSZ + VARHDRSZ];
 		/* ensure union is aligned well enough: */
 		int32		align_it;
 	}			chunk_data;
@@ -132,24 +130,9 @@ toast_save_datum(Relation rel, Datum value,
 	char	   *data_p;
 	int32		data_todo;
 	Pointer		dval = DatumGetPointer(value);
-	int			num_indexes;
-	int			validIndex;
 
 	Assert(!VARATT_IS_EXTERNAL(value));
-
-	/*
-	 * Open the toast relation and its indexes.  We can use the index to check
-	 * uniqueness of the OID we assign to the toasted item, even though it has
-	 * additional columns besides OID.
-	 */
-	toastrel = table_open(rel->rd_rel->reltoastrelid, RowExclusiveLock);
-	toasttupDesc = toastrel->rd_att;
-
-	/* Open all the toast indexes and look for the valid one */
-	validIndex = toast_open_indexes(toastrel,
-									RowExclusiveLock,
-									&toastidxs,
-									&num_indexes);
+	Assert(max_chunk_size <= BLCKSZ);
 
 	/*
 	 * Get the data pointer and length, and compute va_rawsize and va_extsize.
@@ -189,11 +172,11 @@ toast_save_datum(Relation rel, Datum value,
 	 *
 	 * Normally this is the actual OID of the target toast table, but during
 	 * table-rewriting operations such as CLUSTER, we have to insert the OID
-	 * of the table's real permanent toast table instead.  rd_toastoid is set
+	 * of the table's real permanent toast table instead.  toastoid is set
 	 * if we have to substitute such an OID.
 	 */
-	if (OidIsValid(rel->rd_toastoid))
-		toast_pointer.va_toastrelid = rel->rd_toastoid;
+	if (OidIsValid(toastoid))
+		toast_pointer.va_toastrelid = toastoid;
 	else
 		toast_pointer.va_toastrelid = RelationGetRelid(toastrel);
 
@@ -209,7 +192,7 @@ toast_save_datum(Relation rel, Datum value,
 	 * options have been changed), we have to pick a value ID that doesn't
 	 * conflict with either new or existing toast value OIDs.
 	 */
-	if (!OidIsValid(rel->rd_toastoid))
+	if (!OidIsValid(toastoid))
 	{
 		/* normal case: just choose an unused OID */
 		toast_pointer.va_valueid =
@@ -228,7 +211,7 @@ toast_save_datum(Relation rel, Datum value,
 			Assert(VARATT_IS_EXTERNAL_ONDISK(oldexternal));
 			/* Must copy to access aligned fields */
 			VARATT_EXTERNAL_GET_POINTER(old_toast_pointer, oldexternal);
-			if (old_toast_pointer.va_toastrelid == rel->rd_toastoid)
+			if (old_toast_pointer.va_toastrelid == toastoid)
 			{
 				/* This value came from the old toast table; reuse its OID */
 				toast_pointer.va_valueid = old_toast_pointer.va_valueid;
@@ -270,20 +253,11 @@ toast_save_datum(Relation rel, Datum value,
 					GetNewOidWithIndex(toastrel,
 									   RelationGetRelid(toastidxs[validIndex]),
 									   (AttrNumber) 1);
-			} while (toastid_valueid_exists(rel->rd_toastoid,
+			} while (toastid_valueid_exists(toastoid,
 											toast_pointer.va_valueid));
 		}
 	}
 
-	/*
-	 * Initialize constant parts of the tuple data
-	 */
-	t_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
-	t_values[2] = PointerGetDatum(&chunk_data);
-	t_isnull[0] = false;
-	t_isnull[1] = false;
-	t_isnull[2] = false;
-
 	/*
 	 * Split up the item into chunks
 	 */
@@ -296,17 +270,22 @@ toast_save_datum(Relation rel, Datum value,
 		/*
 		 * Calculate the size of this chunk
 		 */
-		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);
+		chunk_size = Min(max_chunk_size, data_todo);
 
 		/*
 		 * Build a tuple and store it
 		 */
-		t_values[1] = Int32GetDatum(chunk_seq++);
+		toastslot->tts_values[0] = ObjectIdGetDatum(toast_pointer.va_valueid);
+		toastslot->tts_values[1] = Int32GetDatum(chunk_seq++);
 		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
-		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
+		toastslot->tts_values[2] = PointerGetDatum(&chunk_data);
+		toastslot->tts_isnull[0] = false;
+		toastslot->tts_isnull[1] = false;
+		toastslot->tts_isnull[2] = false;
+		ExecStoreVirtualTuple(toastslot);
 
-		heap_insert(toastrel, toasttup, mycid, options, NULL);
+		table_tuple_insert(toastrel, toastslot, mycid, options, NULL);
 
 		/*
 		 * Create the index entry.  We cheat a little here by not using
@@ -323,8 +302,9 @@ toast_save_datum(Relation rel, Datum value,
 		{
 			/* Only index relations marked as ready can be updated */
 			if (toastidxs[i]->rd_index->indisready)
-				index_insert(toastidxs[i], t_values, t_isnull,
-							 &(toasttup->t_self),
+				index_insert(toastidxs[i], toastslot->tts_values,
+							 toastslot->tts_isnull,
+							 &(toastslot->tts_tid),
 							 toastrel,
 							 toastidxs[i]->rd_index->indisunique ?
 							 UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
@@ -332,9 +312,9 @@ toast_save_datum(Relation rel, Datum value,
 		}
 
 		/*
-		 * Free memory
+		 * Clear slot
 		 */
-		heap_freetuple(toasttup);
+		ExecClearTuple(toastslot);
 
 		/*
 		 * Move on to next chunk
@@ -343,12 +323,6 @@ toast_save_datum(Relation rel, Datum value,
 		data_p += chunk_size;
 	}
 
-	/*
-	 * Done - close toast relation and its indexes
-	 */
-	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
-	table_close(toastrel, RowExclusiveLock);
-
 	/*
 	 * Create the TOAST pointer value that we'll return
 	 */
@@ -366,35 +340,24 @@ toast_save_datum(Relation rel, Datum value,
  * ----------
  */
 void
-toast_delete_datum(Relation rel, Datum value, bool is_speculative)
+toast_delete_datum(Relation toastrel, int num_indexes, Relation *toastidxs,
+				   int validIndex, Datum value, bool is_speculative,
+				   uint32 specToken)
 {
 	struct varlena *attr = (struct varlena *) DatumGetPointer(value);
 	struct varatt_external toast_pointer;
-	Relation	toastrel;
-	Relation   *toastidxs;
 	ScanKeyData toastkey;
 	SysScanDesc toastscan;
-	HeapTuple	toasttup;
-	int			num_indexes;
-	int			validIndex;
+	TupleTableSlot *slot;
 	SnapshotData SnapshotToast;
 
-	if (!VARATT_IS_EXTERNAL_ONDISK(attr))
-		return;
+	Assert(VARATT_IS_EXTERNAL_ONDISK(attr));
 
 	/* Must copy to access aligned fields */
 	VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);
 
-	/*
-	 * Open the toast relation and its indexes
-	 */
-	toastrel = table_open(toast_pointer.va_toastrelid, RowExclusiveLock);
-
-	/* Fetch valid relation used for process */
-	validIndex = toast_open_indexes(toastrel,
-									RowExclusiveLock,
-									&toastidxs,
-									&num_indexes);
+	/* Check that caller gave us the correct TOAST relation. */
+	Assert(toast_pointer.va_toastrelid == RelationGetRelid(toastrel));
 
 	/*
 	 * Setup a scan key to find chunks with matching va_valueid
@@ -412,23 +375,19 @@ toast_delete_datum(Relation rel, Datum value, bool is_speculative)
 	init_toast_snapshot(&SnapshotToast);
 	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
 										   &SnapshotToast, 1, &toastkey);
-	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
+	while ((slot = systable_getnextslot_ordered(toastscan, ForwardScanDirection)) != NULL)
 	{
 		/*
 		 * Have a chunk, delete it
 		 */
 		if (is_speculative)
-			heap_abort_speculative(toastrel, &toasttup->t_self);
+			table_tuple_complete_speculative(toastrel, slot, specToken, false);
 		else
-			simple_heap_delete(toastrel, &toasttup->t_self);
+			simple_table_tuple_delete(toastrel, &slot->tts_tid, &SnapshotToast);
 	}
 
-	/*
-	 * End scan and close relations
-	 */
+	/* End scan */
 	systable_endscan_ordered(toastscan);
-	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
-	table_close(toastrel, RowExclusiveLock);
 }
 
 /* ----------
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0128bb34ef..08bd087bc8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2809,7 +2809,7 @@ l1:
 		Assert(!HeapTupleHasExternal(&tp));
 	}
 	else if (HeapTupleHasExternal(&tp))
-		heap_toast_delete(relation, &tp, false);
+		heap_toast_delete(relation, &tp, false, 0);
 
 	/*
 	 * Mark tuple for invalidation from system caches at next command
@@ -5564,7 +5564,7 @@ heap_finish_speculative(Relation relation, ItemPointer tid)
  * confirmation records.
  */
 void
-heap_abort_speculative(Relation relation, ItemPointer tid)
+heap_abort_speculative(Relation relation, ItemPointer tid, uint32 specToken)
 {
 	TransactionId xid = GetCurrentTransactionId();
 	ItemId		lp;
@@ -5673,7 +5673,7 @@ heap_abort_speculative(Relation relation, ItemPointer tid)
 	if (HeapTupleHasExternal(&tp))
 	{
 		Assert(!IsToastRelation(relation));
-		heap_toast_delete(relation, &tp, true);
+		heap_toast_delete(relation, &tp, true, specToken);
 	}
 
 	/*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 2dd8821fac..97a7433092 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -28,6 +28,7 @@
 #include "access/rewriteheap.h"
 #include "access/tableam.h"
 #include "access/tsmapi.h"
+#include "access/heaptoast.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/index.h"
@@ -292,7 +293,7 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
 	if (succeeded)
 		heap_finish_speculative(relation, &slot->tts_tid);
 	else
-		heap_abort_speculative(relation, &slot->tts_tid);
+		heap_abort_speculative(relation, &slot->tts_tid, specToken);
 
 	if (shouldFree)
 		pfree(tuple);
@@ -2041,6 +2042,15 @@ heapam_relation_needs_toast_table(Relation rel)
 	return (tuple_length > TOAST_TUPLE_THRESHOLD);
 }
 
+/*
+ * TOAST tables for heap relations are just heap relations.
+ */
+static Oid
+heapam_relation_toast_am(Relation rel)
+{
+	return rel->rd_rel->relam;
+}
+
 
 /* ------------------------------------------------------------------------
  * Planner related callbacks for the heap AM
@@ -2539,6 +2549,8 @@ static const TableAmRoutine heapam_methods = {
 
 	.relation_size = table_block_relation_size,
 	.relation_needs_toast_table = heapam_relation_needs_toast_table,
+	.relation_toast_am = heapam_relation_toast_am,
+	.toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE,
 
 	.relation_estimate_size = heapam_estimate_rel_size,
 
diff --git a/src/backend/access/heap/heaptoast.c b/src/backend/access/heap/heaptoast.c
index dcfdee4467..cca916a39f 100644
--- a/src/backend/access/heap/heaptoast.c
+++ b/src/backend/access/heap/heaptoast.c
@@ -38,7 +38,8 @@
  * ----------
  */
 void
-heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
+heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative,
+				  uint32 specToken)
 {
 	TupleDesc	tupleDesc;
 	Datum		toast_values[MaxHeapAttributeNumber];
@@ -68,7 +69,8 @@ heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative)
 	heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull);
 
 	/* Do the real work. */
-	toast_delete_external(rel, toast_values, toast_isnull, is_speculative);
+	toast_delete_external(rel, toast_values, toast_isnull, is_speculative,
+						  specToken);
 }
 
 
@@ -151,6 +153,8 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		ttc.ttc_oldvalues = toast_oldvalues;
 		ttc.ttc_oldisnull = toast_oldisnull;
 	}
+	ttc.ttc_toastrel = NULL;
+	ttc.ttc_toastslot = NULL;
 	ttc.ttc_attr = toast_attr;
 	toast_tuple_init(&ttc);
 
@@ -207,7 +211,8 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		 */
 		if (toast_attr[biggest_attno].tai_size > maxDataLen &&
 			rel->rd_rel->reltoastrelid != InvalidOid)
-			toast_tuple_externalize(&ttc, biggest_attno, options);
+			toast_tuple_externalize(&ttc, biggest_attno, options,
+									TOAST_MAX_CHUNK_SIZE);
 	}
 
 	/*
@@ -224,7 +229,8 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		biggest_attno = toast_tuple_find_biggest_attribute(&ttc, false, false);
 		if (biggest_attno < 0)
 			break;
-		toast_tuple_externalize(&ttc, biggest_attno, options);
+		toast_tuple_externalize(&ttc, biggest_attno, options,
+								TOAST_MAX_CHUNK_SIZE);
 	}
 
 	/*
@@ -260,7 +266,8 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 		if (biggest_attno < 0)
 			break;
 
-		toast_tuple_externalize(&ttc, biggest_attno, options);
+		toast_tuple_externalize(&ttc, biggest_attno, options,
+								TOAST_MAX_CHUNK_SIZE);
 	}
 
 	/*
@@ -323,7 +330,7 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup,
 	else
 		result_tuple = newtup;
 
-	toast_tuple_cleanup(&ttc);
+	toast_tuple_cleanup(&ttc, true);
 
 	return result_tuple;
 }
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 2599b5d342..233ba24261 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -642,6 +642,26 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
 	return htup;
 }
 
+/*
+ * systable_getnextslot_ordered
+ *
+ * Return a slot containing the next tuple from an ordered catalog scan,
+ * or NULL if there are no more tuples.
+ */
+TupleTableSlot *
+systable_getnextslot_ordered(SysScanDesc sysscan, ScanDirection direction)
+{
+	Assert(sysscan->irel);
+	if (!index_getnext_slot(sysscan->iscan, direction, sysscan->slot))
+		return NULL;
+
+	/* See notes in systable_getnext */
+	if (sysscan->iscan->xs_recheck)
+		elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
+
+	return sysscan->slot;
+}
+
 /*
  * systable_endscan_ordered --- close scan, release resources
  */
diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c
index 7381be8669..dedc123e31 100644
--- a/src/backend/access/table/toast_helper.c
+++ b/src/backend/access/table/toast_helper.c
@@ -17,6 +17,7 @@
 #include "access/detoast.h"
 #include "access/table.h"
 #include "access/toast_helper.h"
+#include "access/tableam.h"
 #include "access/toast_internals.h"
 
 /*
@@ -247,26 +248,49 @@ toast_tuple_try_compression(ToastTupleContext *ttc, int attribute)
  * Move an attribute to external storage.
  */
 void
-toast_tuple_externalize(ToastTupleContext *ttc, int attribute, int options)
+toast_tuple_externalize(ToastTupleContext *ttc, int attribute, int options,
+						int max_chunk_size)
 {
 	Datum	   *value = &ttc->ttc_values[attribute];
 	Datum		old_value = *value;
 	ToastAttrInfo *attr = &ttc->ttc_attr[attribute];
 
-	attr->tai_colflags |= TOASTCOL_IGNORE;
-	*value = toast_save_datum(ttc->ttc_rel, old_value, attr->tai_oldexternal,
-							  options);
+	/* Initialize for TOAST table access, if not yet done. */
+	if (ttc->ttc_toastrel == NULL)
+	{
+		ttc->ttc_toastrel =
+			table_open(ttc->ttc_rel->rd_rel->reltoastrelid, RowExclusiveLock);
+		ttc->ttc_validtoastidx = toast_open_indexes(ttc->ttc_toastrel,
+													RowExclusiveLock,
+													&ttc->ttc_toastidxs,
+													&ttc->ttc_ntoastidxs);
+	}
+	if (ttc->ttc_toastslot == NULL)
+		ttc->ttc_toastslot = table_slot_create(ttc->ttc_toastrel, NULL);
+
+	/* Do the real work. */
+	*value = toast_save_datum(ttc->ttc_toastrel, ttc->ttc_toastslot,
+							  ttc->ttc_ntoastidxs, ttc->ttc_toastidxs,
+							  ttc->ttc_validtoastidx,
+							  ttc->ttc_rel->rd_toastoid,
+							  old_value, attr->tai_oldexternal,
+							  options, max_chunk_size);
+
+	/* Update bookkeeping information. */
 	if ((attr->tai_colflags & TOASTCOL_NEEDS_FREE) != 0)
 		pfree(DatumGetPointer(old_value));
-	attr->tai_colflags |= TOASTCOL_NEEDS_FREE;
+	attr->tai_colflags |= (TOASTCOL_NEEDS_FREE | TOASTCOL_IGNORE);
 	ttc->ttc_flags |= (TOAST_NEEDS_CHANGE | TOAST_NEEDS_FREE);
 }
 
 /*
  * Perform appropriate cleanup after one tuple has been subjected to TOAST.
+ *
+ * Pass cleanup_toastrel as true to destroy and clear ttc_toastrel and
+ * ttc_toastslot, or false if caller will do it.
  */
 void
-toast_tuple_cleanup(ToastTupleContext *ttc)
+toast_tuple_cleanup(ToastTupleContext *ttc, bool cleanup_toastrel)
 {
 	TupleDesc	tupleDesc = ttc->ttc_rel->rd_att;
 	int			numAttrs = tupleDesc->natts;
@@ -294,14 +318,46 @@ toast_tuple_cleanup(ToastTupleContext *ttc)
 	{
 		int			i;
 
+		/* Initialize for TOAST table access, if not yet done. */
+		if (ttc->ttc_toastrel == NULL)
+		{
+			ttc->ttc_toastrel =
+				table_open(ttc->ttc_rel->rd_rel->reltoastrelid,
+						   RowExclusiveLock);
+			ttc->ttc_validtoastidx = toast_open_indexes(ttc->ttc_toastrel,
+														RowExclusiveLock,
+														&ttc->ttc_toastidxs,
+														&ttc->ttc_ntoastidxs);
+		}
+
+		/* Delete those attributes which require it. */
 		for (i = 0; i < numAttrs; i++)
 		{
 			ToastAttrInfo *attr = &ttc->ttc_attr[i];
 
 			if ((attr->tai_colflags & TOASTCOL_NEEDS_DELETE_OLD) != 0)
-				toast_delete_datum(ttc->ttc_rel, ttc->ttc_oldvalues[i], false);
+				toast_delete_datum(ttc->ttc_toastrel, ttc->ttc_ntoastidxs,
+								   ttc->ttc_toastidxs, ttc->ttc_validtoastidx,
+								   ttc->ttc_oldvalues[i], false, 0);
 		}
 	}
+
+	/*
+	 * Close toast table and indexes and drop slot, if previously done and
+	 * if caller requests it.
+	 */
+	if (cleanup_toastrel && ttc->ttc_toastrel != NULL)
+	{
+		if (ttc->ttc_toastslot != NULL)
+		{
+			ExecDropSingleTupleTableSlot(ttc->ttc_toastslot);
+			ttc->ttc_toastslot = NULL;
+		}
+		toast_close_indexes(ttc->ttc_toastidxs, ttc->ttc_ntoastidxs,
+							RowExclusiveLock);
+		table_close(ttc->ttc_toastrel, RowExclusiveLock);
+		ttc->ttc_toastrel = NULL;
+	}
 }
 
 /*
@@ -310,22 +366,43 @@ toast_tuple_cleanup(ToastTupleContext *ttc)
  */
 void
 toast_delete_external(Relation rel, Datum *values, bool *isnull,
-					  bool is_speculative)
+					  bool is_speculative, uint32 specToken)
 {
 	TupleDesc	tupleDesc = rel->rd_att;
 	int			numAttrs = tupleDesc->natts;
 	int			i;
+	Relation    toastrel = NULL;
+	Relation   *toastidxs;
+	int         num_indexes;
+	int         validIndex;
 
 	for (i = 0; i < numAttrs; i++)
 	{
-		if (TupleDescAttr(tupleDesc, i)->attlen == -1)
-		{
-			Datum		value = values[i];
+		Datum	value;
+
+		if (isnull[i] || TupleDescAttr(tupleDesc, i)->attlen != -1)
+			continue;
+
+		value = values[i];
+		if (!VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
+			continue;
 
-			if (isnull[i])
-				continue;
-			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value)))
-				toast_delete_datum(rel, value, is_speculative);
+		/* Initialize for TOAST table access, if not yet done. */
+		if (toastrel == NULL)
+		{
+			toastrel = table_open(rel->rd_rel->reltoastrelid,
+								  RowExclusiveLock);
+			validIndex = toast_open_indexes(toastrel, RowExclusiveLock,
+											&toastidxs, &num_indexes);
 		}
+
+		toast_delete_datum(toastrel, num_indexes, toastidxs, validIndex,
+						   value, is_speculative, specToken);
+	}
+
+	if (toastrel != NULL)
+	{
+		toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
+		table_close(toastrel, RowExclusiveLock);
 	}
 }
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index de6282a667..f082463bf6 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -258,7 +258,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 										   toast_typid,
 										   InvalidOid,
 										   rel->rd_rel->relowner,
-										   rel->rd_rel->relam,
+										   table_relation_toast_am(rel),
 										   tupdesc,
 										   NIL,
 										   RELKIND_TOASTVALUE,
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index a813b004be..128df5e916 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -21,8 +21,9 @@
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
-/* We don't want this file to depend on execnodes.h. */
+/* We don't want this file to depend on execnodes.h or tuptable.h. */
 struct IndexInfo;
+struct TupleTableSlot;
 
 /*
  * Struct for statistics returned by ambuild
@@ -220,6 +221,8 @@ extern SysScanDesc systable_beginscan_ordered(Relation heapRelation,
 											  int nkeys, ScanKey key);
 extern HeapTuple systable_getnext_ordered(SysScanDesc sysscan,
 										  ScanDirection direction);
+extern struct TupleTableSlot *systable_getnextslot_ordered(SysScanDesc sysscan,
+														   ScanDirection direction);
 extern void systable_endscan_ordered(SysScanDesc sysscan);
 
 #endif							/* GENAM_H */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 858bcb6bc9..6ee0c6efa7 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -143,7 +143,8 @@ extern TM_Result heap_delete(Relation relation, ItemPointer tid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
-extern void heap_abort_speculative(Relation relation, ItemPointer tid);
+extern void heap_abort_speculative(Relation relation, ItemPointer tid,
+					   uint32 specToken);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
 							 HeapTuple newtup,
 							 CommandId cid, Snapshot crosscheck, bool wait,
diff --git a/src/include/access/heaptoast.h b/src/include/access/heaptoast.h
index 488a2e4a7f..23f62dc4af 100644
--- a/src/include/access/heaptoast.h
+++ b/src/include/access/heaptoast.h
@@ -104,7 +104,7 @@ extern HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup,
  * ----------
  */
 extern void heap_toast_delete(Relation rel, HeapTuple oldtup,
-							  bool is_speculative);
+							  bool is_speculative, uint32 specToken);
 
 /* ----------
  * toast_flatten_tuple -
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 7f81703b78..521fd6232d 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -581,6 +581,27 @@ typedef struct TableAmRoutine
 	 */
 	bool		(*relation_needs_toast_table) (Relation rel);
 
+	/*
+	 * This callback should return the OID of the table AM that implements
+	 * TOAST tables for this AM.  If the relation_needs_toast_table callback
+	 * always returns false, this callback is not required.
+	 */
+	Oid		    (*relation_toast_am) (Relation rel);
+
+	/*
+	 * If this table AM can be used to implement a TOAST table, the following
+	 * field should be set to the maximum number of bytes that can be stored
+	 * in a single TOAST chunk.  It must not be set to a value greater than
+	 * BLCKSZ.  If this table AM is not used to implement a TOAST table, this
+	 * value is ignored.
+	 *
+	 * (Note that there is no requirement that the TOAST table be implemented
+	 * using the same AM as the table to which it is attached.  If this AM
+	 * has TOAST tables but uses some other AM to implement them, this value
+	 * is ignored; it is a property of the TOAST table, not the parent table.)
+	 */
+	int			toast_max_chunk_size;
+
 
 	/* ------------------------------------------------------------------------
 	 * Planner related functions.
@@ -1603,6 +1624,16 @@ table_relation_needs_toast_table(Relation rel)
 	return rel->rd_tableam->relation_needs_toast_table(rel);
 }
 
+/*
+ * Return the OID of the AM that should be used to implement the TOAST table
+ * for this relation.
+ */
+static inline Oid
+table_relation_toast_am(Relation rel)
+{
+	return rel->rd_tableam->relation_toast_am(rel);
+}
+
 
 /* ----------------------------------------------------------------------------
  * Planner related functionality
diff --git a/src/include/access/toast_helper.h b/src/include/access/toast_helper.h
index 7cefacb0ea..cfb4ae0385 100644
--- a/src/include/access/toast_helper.h
+++ b/src/include/access/toast_helper.h
@@ -14,6 +14,7 @@
 #ifndef TOAST_HELPER_H
 #define TOAST_HELPER_H
 
+#include "executor/tuptable.h"
 #include "utils/rel.h"
 
 /*
@@ -51,6 +52,17 @@ typedef struct
 	Datum	   *ttc_oldvalues;	/* values from previous tuple */
 	bool	   *ttc_oldisnull;	/* null flags from previous tuple */
 
+	/*
+	 * Before calling toast_tuple_init, the caller should either initialize
+	 * all of these fields or else set ttc_toastrel and ttc_toastslot to NULL.
+	 * In the latter case, all of the fields will be initialized as required.
+	 */
+	Relation	ttc_toastrel;	/* the toast table for the relation */
+	TupleTableSlot *ttc_toastslot;	/* a slot for the toast table */
+	int			ttc_ntoastidxs; /* # of toast indexes for toast table */
+	Relation   *ttc_toastidxs;	/* array of those toast indexes */
+	int			ttc_validtoastidx;	/* the valid toast index */
+
 	/*
 	 * Before calling toast_tuple_init, the caller should set tts_attr to
 	 * point to an array of ToastAttrInfo structures of a length equal to
@@ -106,10 +118,10 @@ extern int	toast_tuple_find_biggest_attribute(ToastTupleContext *ttc,
 											   bool check_main);
 extern void toast_tuple_try_compression(ToastTupleContext *ttc, int attribute);
 extern void toast_tuple_externalize(ToastTupleContext *ttc, int attribute,
-									int options);
-extern void toast_tuple_cleanup(ToastTupleContext *ttc);
+									int options, int max_chunk_size);
+extern void toast_tuple_cleanup(ToastTupleContext *ttc, bool cleanup_toastrel);
 
 extern void toast_delete_external(Relation rel, Datum *values, bool *isnull,
-								  bool is_speculative);
+								  bool is_speculative, uint32 specToken);
 
 #endif
diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h
index 9bd1c97771..eb6137dfab 100644
--- a/src/include/access/toast_internals.h
+++ b/src/include/access/toast_internals.h
@@ -16,6 +16,8 @@
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
+struct TupleTableSlot;
+
 /*
  *	The information at the start of the compressed toast data.
  */
@@ -40,9 +42,16 @@ typedef struct toast_compress_header
 extern Datum toast_compress_datum(Datum value);
 extern Oid	toast_get_valid_index(Oid toastoid, LOCKMODE lock);
 
-extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative);
-extern Datum toast_save_datum(Relation rel, Datum value,
-							  struct varlena *oldexternal, int options);
+extern void toast_delete_datum(Relation toastrel, int num_indexes,
+							   Relation *toastidxs, int validIndex,
+							   Datum value, bool is_speculative,
+							   uint32 specToken);
+extern Datum toast_save_datum(Relation toastrel,
+							  struct TupleTableSlot *toastslot,
+							  int num_indexes, Relation *toastidxs,
+							  int validIndex, Oid toastoid,
+							  Datum value, struct varlena *oldexternal,
+							  int options, int max_chunk_size);
 
 extern int	toast_open_indexes(Relation toastrel,
 							   LOCKMODE lock,
-- 
2.17.2 (Apple Git-113)

