On 22/10/2020 14:48, Heikki Linnakangas wrote:
On 11/09/2017 13:37, Tomas Vondra wrote:
I planned to do some benchmarking on this patch, but apparently the
patch no longer applies. Rebase please?

Here's a rebase of this. Sorry to keep you waiting :-).

Here's an updated version that fixes one bug:

The CFBot was reporting a failure on the FreeBSD system [1]. It turned out to be an out-of-memory issue caused by an underflow bug in the calculation of the size of the tape read buffer size. With a small work_mem size, the memory left for tape buffers was negative, and that wrapped around to a very large number. I believe that was not caught by the other systems, because the other ones had enough memory for the incorrectly-sized buffers anyway. That was the case on my laptop at least. It did cause a big slowdown in the 'tuplesort' regression test though, which I hadn't noticed.

The fix for that bug is here as a separate patch for easier review, but I'll squash it before committing.

[1] https://cirrus-ci.com/task/6699842091089920

- Heikki

>From 8fd75cfc8574e3a5885f88f13f11b0676c99f39f Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 21 Oct 2020 21:57:42 +0300
Subject: [PATCH v3 1/3] Refactor LogicalTapeSet/LogicalTape interface.

All the tape functions, like LogicalTapeRead and LogicalTapeWrite, now
take a LogicalTape as argument, instead of LogicalTapeSet+tape number.
You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set.

This makes the tape management in hash agg spilling in nodeAgg.c simpler.
---
 src/backend/executor/nodeAgg.c     | 187 ++++--------
 src/backend/utils/sort/logtape.c   | 456 ++++++++++++-----------------
 src/backend/utils/sort/tuplesort.c | 229 +++++++--------
 src/include/nodes/execnodes.h      |   3 +-
 src/include/utils/logtape.h        |  37 ++-
 5 files changed, 360 insertions(+), 552 deletions(-)

diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 601b6dab03f..13f6f188668 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -208,7 +208,16 @@
  *
  *	  Spilled data is written to logical tapes. These provide better control
  *	  over memory usage, disk space, and the number of files than if we were
- *	  to use a BufFile for each spill.
+ *	  to use a BufFile for each spill.  We don't know the number of tapes needed
+ *	  at the start of the algorithm (because it can recurse), so a tape set is
+ *	  allocated at the beginning, and individual tapes are created as needed.
+ *	  As a particular tape is read, logtape.c recycles its disk space. When a
+ *	  tape is read to completion, it is destroyed entirely.
+ *
+ *	  Tapes' buffers can take up substantial memory when many tapes are open at
+ *	  once. We only need one tape open at a time in read mode (using a buffer
+ *	  that's a multiple of BLCKSZ); but we need one tape open in write mode (each
+ *	  requiring a buffer of size BLCKSZ) for each partition.
  *
  *	  Note that it's possible for transition states to start small but then
  *	  grow very large; for instance in the case of ARRAY_AGG. In such cases,
@@ -311,27 +320,6 @@
  */
 #define CHUNKHDRSZ 16
 
-/*
- * Track all tapes needed for a HashAgg that spills. We don't know the maximum
- * number of tapes needed at the start of the algorithm (because it can
- * recurse), so one tape set is allocated and extended as needed for new
- * tapes. When a particular tape is already read, rewind it for write mode and
- * put it in the free list.
- *
- * Tapes' buffers can take up substantial memory when many tapes are open at
- * once. We only need one tape open at a time in read mode (using a buffer
- * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
- * requiring a buffer of size BLCKSZ) for each partition.
- */
-typedef struct HashTapeInfo
-{
-	LogicalTapeSet *tapeset;
-	int			ntapes;
-	int		   *freetapes;
-	int			nfreetapes;
-	int			freetapes_alloc;
-} HashTapeInfo;
-
 /*
  * Represents partitioned spill data for a single hashtable. Contains the
  * necessary information to route tuples to the correct partition, and to
@@ -343,9 +331,8 @@ typedef struct HashTapeInfo
  */
 typedef struct HashAggSpill
 {
-	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
 	int			npartitions;	/* number of partitions */
-	int		   *partitions;		/* spill partition tape numbers */
+	LogicalTape **partitions;	/* spill partition tapes */
 	int64	   *ntuples;		/* number of tuples in each partition */
 	uint32		mask;			/* mask to find partition from hash value */
 	int			shift;			/* after masking, shift by this amount */
@@ -365,8 +352,7 @@ typedef struct HashAggBatch
 {
 	int			setno;			/* grouping set */
 	int			used_bits;		/* number of bits of hash already used */
-	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
-	int			input_tapenum;	/* input partition tape */
+	LogicalTape *input_tape;	/* input partition tape */
 	int64		input_tuples;	/* number of tuples in this batch */
 	double		input_card;		/* estimated group cardinality */
 } HashAggBatch;
@@ -442,22 +428,17 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
 									int npartitions);
 static void hashagg_finish_initial_spills(AggState *aggstate);
 static void hashagg_reset_spill_state(AggState *aggstate);
-static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
-									   int input_tapenum, int setno,
+static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
 									   int64 input_tuples, double input_card,
 									   int used_bits);
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
-static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
+static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *lts,
 							   int used_bits, double input_groups,
 							   double hashentrysize);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 								TupleTableSlot *slot, uint32 hash);
 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
 								 int setno);
-static void hashagg_tapeinfo_init(AggState *aggstate);
-static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
-									int ndest);
-static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
 									  AggState *aggstate, EState *estate,
@@ -1879,12 +1860,12 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 
 	if (!aggstate->hash_ever_spilled)
 	{
-		Assert(aggstate->hash_tapeinfo == NULL);
+		Assert(aggstate->hash_tapeset == NULL);
 		Assert(aggstate->hash_spills == NULL);
 
 		aggstate->hash_ever_spilled = true;
 
-		hashagg_tapeinfo_init(aggstate);
+		aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1);
 
 		aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
 
@@ -1893,7 +1874,7 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 			AggStatePerHash perhash = &aggstate->perhash[setno];
 			HashAggSpill *spill = &aggstate->hash_spills[setno];
 
-			hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+			hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 							   perhash->aggnode->numGroups,
 							   aggstate->hashentrysize);
 		}
@@ -1935,9 +1916,9 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 		aggstate->hash_mem_peak = total_mem;
 
 	/* update disk usage */
-	if (aggstate->hash_tapeinfo != NULL)
+	if (aggstate->hash_tapeset != NULL)
 	{
-		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
+		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024);
 
 		if (aggstate->hash_disk_used < disk_used)
 			aggstate->hash_disk_used = disk_used;
@@ -2121,7 +2102,7 @@ lookup_hash_entries(AggState *aggstate)
 			TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
 
 			if (spill->partitions == NULL)
-				hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
+				hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 								   perhash->aggnode->numGroups,
 								   aggstate->hashentrysize);
 
@@ -2586,7 +2567,7 @@ agg_refill_hash_table(AggState *aggstate)
 	HashAggBatch *batch;
 	AggStatePerHash perhash;
 	HashAggSpill spill;
-	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
+	LogicalTapeSet *tapeset = aggstate->hash_tapeset;
 	bool		spill_initialized = false;
 
 	if (aggstate->hash_batches == NIL)
@@ -2682,7 +2663,7 @@ agg_refill_hash_table(AggState *aggstate)
 				 * that we don't assign tapes that will never be used.
 				 */
 				spill_initialized = true;
-				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
+				hashagg_spill_init(&spill, tapeset, batch->used_bits,
 								   batch->input_card, aggstate->hashentrysize);
 			}
 			/* no memory for a new group, spill */
@@ -2698,7 +2679,7 @@ agg_refill_hash_table(AggState *aggstate)
 		ResetExprContext(aggstate->tmpcontext);
 	}
 
-	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);
+	LogicalTapeClose(batch->input_tape);
 
 	/* change back to phase 0 */
 	aggstate->current_phase = 0;
@@ -2873,67 +2854,6 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
 	return NULL;
 }
 
-/*
- * Initialize HashTapeInfo
- */
-static void
-hashagg_tapeinfo_init(AggState *aggstate)
-{
-	HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
-	int			init_tapes = 16;	/* expanded dynamically */
-
-	tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
-	tapeinfo->ntapes = init_tapes;
-	tapeinfo->nfreetapes = init_tapes;
-	tapeinfo->freetapes_alloc = init_tapes;
-	tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
-	for (int i = 0; i < init_tapes; i++)
-		tapeinfo->freetapes[i] = i;
-
-	aggstate->hash_tapeinfo = tapeinfo;
-}
-
-/*
- * Assign unused tapes to spill partitions, extending the tape set if
- * necessary.
- */
-static void
-hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
-						int npartitions)
-{
-	int			partidx = 0;
-
-	/* use free tapes if available */
-	while (partidx < npartitions && tapeinfo->nfreetapes > 0)
-		partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];
-
-	if (partidx < npartitions)
-	{
-		LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);
-
-		while (partidx < npartitions)
-			partitions[partidx++] = tapeinfo->ntapes++;
-	}
-}
-
-/*
- * After a tape has already been written to and then read, this function
- * rewinds it for writing and adds it to the free list.
- */
-static void
-hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
-{
-	/* rewinding frees the buffer while not in use */
-	LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
-	if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
-	{
-		tapeinfo->freetapes_alloc <<= 1;
-		tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
-									   tapeinfo->freetapes_alloc * sizeof(int));
-	}
-	tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
-}
-
 /*
  * hashagg_spill_init
  *
@@ -2941,7 +2861,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
  * of partitions to create, and initializes them.
  */
 static void
-hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
+hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
 				   double input_groups, double hashentrysize)
 {
 	int			npartitions;
@@ -2950,13 +2870,13 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
 	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
 											 used_bits, &partition_bits);
 
-	spill->partitions = palloc0(sizeof(int) * npartitions);
+	spill->partitions = palloc0(sizeof(LogicalTape *) * npartitions);
 	spill->ntuples = palloc0(sizeof(int64) * npartitions);
 	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
 
-	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
+	for (int i = 0; i < npartitions; i++)
+		spill->partitions[i] = LogicalTapeCreate(tapeset);
 
-	spill->tapeset = tapeinfo->tapeset;
 	spill->shift = 32 - used_bits - partition_bits;
 	spill->mask = (npartitions - 1) << spill->shift;
 	spill->npartitions = npartitions;
@@ -2975,11 +2895,10 @@ static Size
 hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 					TupleTableSlot *inputslot, uint32 hash)
 {
-	LogicalTapeSet *tapeset = spill->tapeset;
 	TupleTableSlot *spillslot;
 	int			partition;
 	MinimalTuple tuple;
-	int			tapenum;
+	LogicalTape *tape;
 	int			total_written = 0;
 	bool		shouldFree;
 
@@ -3018,12 +2937,12 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 	 */
 	addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
 
-	tapenum = spill->partitions[partition];
+	tape = spill->partitions[partition];
 
-	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
+	LogicalTapeWrite(tape, (void *) &hash, sizeof(uint32));
 	total_written += sizeof(uint32);
 
-	LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
+	LogicalTapeWrite(tape, (void *) tuple, tuple->t_len);
 	total_written += tuple->t_len;
 
 	if (shouldFree)
@@ -3039,15 +2958,14 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
  * be done.
  */
 static HashAggBatch *
-hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
+hashagg_batch_new(LogicalTape *input_tape, int setno,
 				  int64 input_tuples, double input_card, int used_bits)
 {
 	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
 
 	batch->setno = setno;
 	batch->used_bits = used_bits;
-	batch->tapeset = tapeset;
-	batch->input_tapenum = tapenum;
+	batch->input_tape = input_tape;
 	batch->input_tuples = input_tuples;
 	batch->input_card = input_card;
 
@@ -3061,42 +2979,41 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
 static MinimalTuple
 hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
 {
-	LogicalTapeSet *tapeset = batch->tapeset;
-	int			tapenum = batch->input_tapenum;
+	LogicalTape *tape = batch->input_tape;
 	MinimalTuple tuple;
 	uint32		t_len;
 	size_t		nread;
 	uint32		hash;
 
-	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
+	nread = LogicalTapeRead(tape, &hash, sizeof(uint32));
 	if (nread == 0)
 		return NULL;
 	if (nread != sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, sizeof(uint32), nread)));
 	if (hashp != NULL)
 		*hashp = hash;
 
-	nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
+	nread = LogicalTapeRead(tape, &t_len, sizeof(t_len));
 	if (nread != sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, sizeof(uint32), nread)));
 
 	tuple = (MinimalTuple) palloc(t_len);
 	tuple->t_len = t_len;
 
-	nread = LogicalTapeRead(tapeset, tapenum,
+	nread = LogicalTapeRead(tape,
 							(void *) ((char *) tuple + sizeof(uint32)),
 							t_len - sizeof(uint32));
 	if (nread != t_len - sizeof(uint32))
 		ereport(ERROR,
 				(errcode_for_file_access(),
-				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
-						tapenum, t_len - sizeof(uint32), nread)));
+				 errmsg("unexpected EOF for tape %p: requested %zu bytes, read %zu bytes",
+						tape, t_len - sizeof(uint32), nread)));
 
 	return tuple;
 }
@@ -3153,8 +3070,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 
 	for (i = 0; i < spill->npartitions; i++)
 	{
-		LogicalTapeSet	*tapeset = aggstate->hash_tapeinfo->tapeset;
-		int				 tapenum = spill->partitions[i];
+		LogicalTape *tape = spill->partitions[i];
 		HashAggBatch	*new_batch;
 		double			 cardinality;
 
@@ -3166,10 +3082,9 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
 		freeHyperLogLog(&spill->hll_card[i]);
 
 		/* rewinding frees the buffer while not in use */
-		LogicalTapeRewindForRead(tapeset, tapenum,
-								 HASHAGG_READ_BUFFER_SIZE);
+		LogicalTapeRewindForRead(tape, HASHAGG_READ_BUFFER_SIZE);
 
-		new_batch = hashagg_batch_new(tapeset, tapenum, setno,
+		new_batch = hashagg_batch_new(tape, setno,
 									  spill->ntuples[i], cardinality,
 									  used_bits);
 		aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
@@ -3216,14 +3131,10 @@ hashagg_reset_spill_state(AggState *aggstate)
 	aggstate->hash_batches = NIL;
 
 	/* close tape set */
-	if (aggstate->hash_tapeinfo != NULL)
+	if (aggstate->hash_tapeset != NULL)
 	{
-		HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
-
-		LogicalTapeSetClose(tapeinfo->tapeset);
-		pfree(tapeinfo->freetapes);
-		pfree(tapeinfo);
-		aggstate->hash_tapeinfo = NULL;
+		LogicalTapeSetClose(aggstate->hash_tapeset);
+		aggstate->hash_tapeset = NULL;
 	}
 }
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 089ba2e1068..82812716c2d 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -9,8 +9,7 @@
  * there is an annoying problem: the peak space usage is at least twice
  * the volume of actual data to be sorted.  (This must be so because each
  * datum will appear in both the input and output tapes of the final
- * merge pass.  For seven-tape polyphase merge, which is otherwise a
- * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ * merge pass.)
  *
  * We can work around this problem by recognizing that any one tape
  * dataset (with the possible exception of the final output) is written
@@ -137,6 +136,8 @@ typedef struct TapeBlockTrailer
  */
 typedef struct LogicalTape
 {
+	LogicalTapeSet *tapeSet;	/* tape set this tape is part of */
+
 	bool		writing;		/* T while in write phase */
 	bool		frozen;			/* T if blocks should not be freed when read */
 	bool		dirty;			/* does buffer need to be written? */
@@ -180,11 +181,14 @@ typedef struct LogicalTape
  * This data structure represents a set of related "logical tapes" sharing
  * space in a single underlying file.  (But that "file" may be multiple files
  * if needed to escape OS limits on file size; buffile.c handles that for us.)
- * The number of tapes is fixed at creation.
+ * Tapes belonging to a tape set can be created and destroyed on-the-fly, on
+ * demand.
  */
 struct LogicalTapeSet
 {
 	BufFile    *pfile;			/* underlying file for whole tape set */
+	SharedFileSet *fileset;
+	int			worker;			/* worker # if shared, -1 for leader/serial */
 
 	/*
 	 * File size tracking.  nBlocksWritten is the size of the underlying file,
@@ -213,22 +217,16 @@ struct LogicalTapeSet
 	long		nFreeBlocks;	/* # of currently free blocks */
 	Size		freeBlocksLen;	/* current allocated length of freeBlocks[] */
 	bool		enable_prealloc;	/* preallocate write blocks? */
-
-	/* The array of logical tapes. */
-	int			nTapes;			/* # of logical tapes in set */
-	LogicalTape *tapes;			/* has nTapes nentries */
 };
 
+static LogicalTape *ltsCreateTape(LogicalTapeSet *lts);
 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
 static long ltsGetBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static long ltsGetFreeBlock(LogicalTapeSet *lts);
 static long ltsGetPreallocBlock(LogicalTapeSet *lts, LogicalTape *lt);
 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
-static void ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-								 SharedFileSet *fileset);
-static void ltsInitTape(LogicalTape *lt);
-static void ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsInitReadBuffer(LogicalTape *lt);
 
 
 /*
@@ -304,7 +302,7 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
  * Returns true if anything was read, 'false' on EOF.
  */
 static bool
-ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsReadFillBuffer(LogicalTape *lt)
 {
 	lt->pos = 0;
 	lt->nbytes = 0;
@@ -321,9 +319,9 @@ ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 		datablocknum += lt->offsetBlockNumber;
 
 		/* Read the block */
-		ltsReadBlock(lts, datablocknum, (void *) thisbuf);
+		ltsReadBlock(lt->tapeSet, datablocknum, (void *) thisbuf);
 		if (!lt->frozen)
-			ltsReleaseBlock(lts, datablocknum);
+			ltsReleaseBlock(lt->tapeSet, datablocknum);
 		lt->curBlockNumber = lt->nextBlockNumber;
 
 		lt->nbytes += TapeBlockGetNBytes(thisbuf);
@@ -530,126 +528,13 @@ ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
 	}
 }
 
-/*
- * Claim ownership of a set of logical tapes from existing shared BufFiles.
- *
- * Caller should be leader process.  Though tapes are marked as frozen in
- * workers, they are not frozen when opened within leader, since unfrozen tapes
- * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
- * for random access.)
- */
-static void
-ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared,
-					 SharedFileSet *fileset)
-{
-	LogicalTape *lt = NULL;
-	long		tapeblocks = 0L;
-	long		nphysicalblocks = 0L;
-	int			i;
-
-	/* Should have at least one worker tape, plus leader's tape */
-	Assert(lts->nTapes >= 2);
-
-	/*
-	 * Build concatenated view of all BufFiles, remembering the block number
-	 * where each source file begins.  No changes are needed for leader/last
-	 * tape.
-	 */
-	for (i = 0; i < lts->nTapes - 1; i++)
-	{
-		char		filename[MAXPGPATH];
-		BufFile    *file;
-		int64		filesize;
-
-		lt = &lts->tapes[i];
-
-		pg_itoa(i, filename);
-		file = BufFileOpenShared(fileset, filename, O_RDONLY);
-		filesize = BufFileSize(file);
-
-		/*
-		 * Stash first BufFile, and concatenate subsequent BufFiles to that.
-		 * Store block offset into each tape as we go.
-		 */
-		lt->firstBlockNumber = shared[i].firstblocknumber;
-		if (i == 0)
-		{
-			lts->pfile = file;
-			lt->offsetBlockNumber = 0L;
-		}
-		else
-		{
-			lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
-		}
-		/* Don't allocate more for read buffer than could possibly help */
-		lt->max_size = Min(MaxAllocSize, filesize);
-		tapeblocks = filesize / BLCKSZ;
-		nphysicalblocks += tapeblocks;
-	}
-
-	/*
-	 * Set # of allocated blocks, as well as # blocks written.  Use extent of
-	 * new BufFile space (from 0 to end of last worker's tape space) for this.
-	 * Allocated/written blocks should include space used by holes left
-	 * between concatenated BufFiles.
-	 */
-	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
-	lts->nBlocksWritten = lts->nBlocksAllocated;
-
-	/*
-	 * Compute number of hole blocks so that we can later work backwards, and
-	 * instrument number of physical blocks.  We don't simply use physical
-	 * blocks directly for instrumentation because this would break if we ever
-	 * subsequently wrote to the leader tape.
-	 *
-	 * Working backwards like this keeps our options open.  If shared BufFiles
-	 * ever support being written to post-export, logtape.c can automatically
-	 * take advantage of that.  We'd then support writing to the leader tape
-	 * while recycling space from worker tapes, because the leader tape has a
-	 * zero offset (write routines won't need to have extra logic to apply an
-	 * offset).
-	 *
-	 * The only thing that currently prevents writing to the leader tape from
-	 * working is the fact that BufFiles opened using BufFileOpenShared() are
-	 * read-only by definition, but that could be changed if it seemed
-	 * worthwhile.  For now, writing to the leader tape will raise a "Bad file
-	 * descriptor" error, so tuplesort must avoid writing to the leader tape
-	 * altogether.
-	 */
-	lts->nHoleBlocks = lts->nBlocksAllocated - nphysicalblocks;
-}
-
-/*
- * Initialize per-tape struct.  Note we allocate the I/O buffer lazily.
- */
-static void
-ltsInitTape(LogicalTape *lt)
-{
-	lt->writing = true;
-	lt->frozen = false;
-	lt->dirty = false;
-	lt->firstBlockNumber = -1L;
-	lt->curBlockNumber = -1L;
-	lt->nextBlockNumber = -1L;
-	lt->offsetBlockNumber = 0L;
-	lt->buffer = NULL;
-	lt->buffer_size = 0;
-	/* palloc() larger than MaxAllocSize would fail */
-	lt->max_size = MaxAllocSize;
-	lt->pos = 0;
-	lt->nbytes = 0;
-	lt->prealloc = NULL;
-	lt->nprealloc = 0;
-	lt->prealloc_size = 0;
-}
-
 /*
  * Lazily allocate and initialize the read buffer. This avoids waste when many
  * tapes are open at once, but not all are active between rewinding and
  * reading.
  */
 static void
-ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsInitReadBuffer(LogicalTape *lt)
 {
 	Assert(lt->buffer_size > 0);
 	lt->buffer = palloc(lt->buffer_size);
@@ -658,40 +543,32 @@ ltsInitReadBuffer(LogicalTapeSet *lts, LogicalTape *lt)
 	lt->nextBlockNumber = lt->firstBlockNumber;
 	lt->pos = 0;
 	lt->nbytes = 0;
-	ltsReadFillBuffer(lts, lt);
+	ltsReadFillBuffer(lt);
 }
 
 /*
- * Create a set of logical tapes in a temporary underlying file.
+ * Create a tape set, backed by a temporary underlying file.
  *
- * Each tape is initialized in write state.  Serial callers pass ntapes,
- * NULL argument for shared, and -1 for worker.  Parallel worker callers
- * pass ntapes, a shared file handle, NULL shared argument,  and their own
- * worker number.  Leader callers, which claim shared worker tapes here,
- * must supply non-sentinel values for all arguments except worker number,
- * which should be -1.
+ * The tape set is initially empty. Use LogicalTapeCreate() to create
+ * tapes in it.
  *
- * Leader caller is passing back an array of metadata each worker captured
- * when LogicalTapeFreeze() was called for their final result tapes.  Passed
- * tapes array is actually sized ntapes - 1, because it includes only
- * worker tapes, whereas leader requires its own leader tape.  Note that we
- * rely on the assumption that reclaimed worker tapes will only be read
- * from once by leader, and never written to again (tapes are initialized
- * for writing, but that's only to be consistent).  Leader may not write to
- * its own tape purely due to a restriction in the shared buffile
- * infrastructure that may be lifted in the future.
+ * Serial callers pass NULL argument for shared, and -1 for worker.  Parallel
+ * worker callers pass a shared file handle and their own worker number.
+ *
+ * Leader callers pass a shared file handle and -1 for worker. After creating
+ * the tape set, use LogicalTapeImport() to import the worker tapes into it.
+ *
+ * Currently, the leader will only import worker tapes into the set, it does
+ * not create tapes of its own, although in principle that should work.
  */
 LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
-					 SharedFileSet *fileset, int worker)
+LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 {
 	LogicalTapeSet *lts;
-	int			i;
 
 	/*
 	 * Create top-level struct including per-tape LogicalTape structs.
 	 */
-	Assert(ntapes > 0);
 	lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
 	lts->nBlocksAllocated = 0L;
 	lts->nBlocksWritten = 0L;
@@ -701,22 +578,21 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
 	lts->nFreeBlocks = 0;
 	lts->enable_prealloc = preallocate;
-	lts->nTapes = ntapes;
-	lts->tapes = (LogicalTape *) palloc(ntapes * sizeof(LogicalTape));
 
-	for (i = 0; i < ntapes; i++)
-		ltsInitTape(&lts->tapes[i]);
+	lts->fileset = fileset;
+	lts->worker = worker;
 
 	/*
 	 * Create temp BufFile storage as required.
 	 *
-	 * Leader concatenates worker tapes, which requires special adjustment to
-	 * final tapeset data.  Things are simpler for the worker case and the
+	 * In leader, we hijack the BufFile of the first tape that's imported, and
+	 * concatenate the BufFiles of any subsequent tapes to that. Hence don't
+	 * create a BufFile here. Things are simpler for the worker case and the
 	 * serial case, though.  They are generally very similar -- workers use a
 	 * shared fileset, whereas serial sorts use a conventional serial BufFile.
 	 */
-	if (shared)
-		ltsConcatWorkerTapes(lts, shared, fileset);
+	if (fileset && worker == -1)
+		lts->pfile = NULL;
 	else if (fileset)
 	{
 		char		filename[MAXPGPATH];
@@ -730,27 +606,147 @@ LogicalTapeSetCreate(int ntapes, bool preallocate, TapeShare *shared,
 	return lts;
 }
 
+
 /*
- * Close a logical tape set and release all resources.
+ * Claim ownership of a logical tape from an existing shared BufFile.
+ *
+ * Caller should be leader process.  Though tapes are marked as frozen in
+ * workers, they are not frozen when opened within leader, since unfrozen tapes
+ * use a larger read buffer. (Frozen tapes have smaller read buffer, optimized
+ * for random access.)
  */
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
+LogicalTape *
+LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared)
 {
 	LogicalTape *lt;
-	int			i;
+	long		tapeblocks;
+	char		filename[MAXPGPATH];
+	BufFile    *file;
+	int64		filesize;
 
-	BufFileClose(lts->pfile);
-	for (i = 0; i < lts->nTapes; i++)
+	lt = ltsCreateTape(lts);
+
+	/*
+	 * build concatenated view of all buffiles, remembering the block number
+	 * where each source file begins.
+	 */
+	pg_itoa(worker, filename);
+	file = BufFileOpenShared(lts->fileset, filename, O_RDONLY);
+	filesize = BufFileSize(file);
+
+	/*
+	 * Stash first BufFile, and concatenate subsequent BufFiles to that. Store
+	 * block offset into each tape as we go.
+	 */
+	lt->firstBlockNumber = shared->firstblocknumber;
+	if (lts->pfile == NULL)
 	{
-		lt = &lts->tapes[i];
-		if (lt->buffer)
-			pfree(lt->buffer);
+		lts->pfile = file;
+		lt->offsetBlockNumber = 0L;
 	}
-	pfree(lts->tapes);
+	else
+	{
+		lt->offsetBlockNumber = BufFileAppend(lts->pfile, file);
+	}
+	/* Don't allocate more for read buffer than could possibly help */
+	lt->max_size = Min(MaxAllocSize, filesize);
+	tapeblocks = filesize / BLCKSZ;
+
+	/*
+	 * Update # of allocated blocks and # blocks written to reflect the
+	 * imported BufFile.  Allocated/written blocks include space used by holes
+	 * left between concatenated BufFiles.  Also track the number of hole
+	 * blocks so that we can later work backwards to calculate the number of
+	 * physical blocks for instrumentation.
+	 */
+	lts->nHoleBlocks += lt->offsetBlockNumber - lts->nBlocksAllocated;
+
+	lts->nBlocksAllocated = lt->offsetBlockNumber + tapeblocks;
+	lts->nBlocksWritten = lts->nBlocksAllocated;
+
+	return lt;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any of the tapes!  You must close them
+ * first, or you can let them be destroyed along with the memory context.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+	BufFileClose(lts->pfile);
 	pfree(lts->freeBlocks);
 	pfree(lts);
 }
 
+/*
+ * Create a logical tape in the given tapeset.
+ *
+ * The tape is initialized in write state.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts)
+{
+	/*
+	 * The only thing that currently prevents creating new tapes in leader is
+	 * the fact that BufFiles opened using BufFileOpenShared() are read-only
+	 * by definition, but that could be changed if it seemed worthwhile.  For
+	 * now, writing to the leader tape will raise a "Bad file descriptor"
+	 * error, so tuplesort must avoid writing to the leader tape altogether.
+	 */
+	if (lts->fileset && lts->worker == -1)
+		elog(ERROR, "cannot create new tapes in leader process");
+
+	return ltsCreateTape(lts);
+}
+
+static LogicalTape *
+ltsCreateTape(LogicalTapeSet *lts)
+{
+	LogicalTape *lt;
+
+	/*
+	 * Create per-tape struct.  Note we allocate the I/O buffer lazily.
+	 */
+	lt = palloc(sizeof(LogicalTape));
+	lt->tapeSet = lts;
+	lt->writing = true;
+	lt->frozen = false;
+	lt->dirty = false;
+	lt->firstBlockNumber = -1L;
+	lt->curBlockNumber = -1L;
+	lt->nextBlockNumber = -1L;
+	lt->offsetBlockNumber = 0L;
+	lt->buffer = NULL;
+	lt->buffer_size = 0;
+	/* palloc() larger than MaxAllocSize would fail */
+	lt->max_size = MaxAllocSize;
+	lt->pos = 0;
+	lt->nbytes = 0;
+	lt->prealloc = NULL;
+	lt->nprealloc = 0;
+	lt->prealloc_size = 0;
+
+	return lt;
+}
+
+/*
+ * Close a logical tape.
+ *
+ * Note: This doesn't return any blocks to the free list!  You must read
+ * the tape to the end first, to reuse the space.  In current use, though,
+ * we only close tapes after fully reading them.
+ */
+void
+LogicalTapeClose(LogicalTape *lt)
+{
+	if (lt->buffer)
+		pfree(lt->buffer);
+	pfree(lt);
+}
+
 /*
  * Mark a logical tape set as not needing management of free space anymore.
  *
@@ -772,14 +768,11 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
  * There are no error returns; we ereport() on failure.
  */
 void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-				 void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
+	LogicalTapeSet *lts = lt->tapeSet;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -818,11 +811,11 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 			 * First allocate the next block, so that we can store it in the
 			 * 'next' pointer of this block.
 			 */
-			nextBlockNumber = ltsGetBlock(lts, lt);
+			nextBlockNumber = ltsGetBlock(lt->tapeSet, lt);
 
 			/* set the next-pointer and dump the current block. */
 			TapeBlockGetTrailer(lt->buffer)->next = nextBlockNumber;
-			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 
 			/* initialize the prev-pointer of the next block */
 			TapeBlockGetTrailer(lt->buffer)->prev = lt->curBlockNumber;
@@ -860,12 +853,9 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
  * byte buffer is used.
  */
 void
-LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
+LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
+	LogicalTapeSet *lts = lt->tapeSet;
 
 	/*
 	 * Round and cap buffer_size if needed.
@@ -907,7 +897,7 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 									  lt->buffer_size - lt->nbytes);
 
 			TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-			ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+			ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 		}
 		lt->writing = false;
 	}
@@ -939,61 +929,28 @@ LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, size_t buffer_size)
 	}
 }
 
-/*
- * Rewind logical tape and switch from reading to writing.
- *
- * NOTE: we assume the caller has read the tape to the end; otherwise
- * untouched data will not have been freed. We could add more code to free
- * any unread blocks, but in current usage of this module it'd be useless
- * code.
- */
-void
-LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum)
-{
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
-
-	Assert(!lt->writing && !lt->frozen);
-	lt->writing = true;
-	lt->dirty = false;
-	lt->firstBlockNumber = -1L;
-	lt->curBlockNumber = -1L;
-	lt->pos = 0;
-	lt->nbytes = 0;
-	if (lt->buffer)
-		pfree(lt->buffer);
-	lt->buffer = NULL;
-	lt->buffer_size = 0;
-}
-
 /*
  * Read from a logical tape.
  *
  * Early EOF is indicated by return value less than #bytes requested.
  */
 size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-				void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
 	size_t		nread = 0;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(!lt->writing);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	while (size > 0)
 	{
 		if (lt->pos >= lt->nbytes)
 		{
 			/* Try to load more data into buffer. */
-			if (!ltsReadFillBuffer(lts, lt))
+			if (!ltsReadFillBuffer(lt))
 				break;			/* EOF */
 		}
 
@@ -1031,12 +988,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
  * Serial sorts should set share to NULL.
  */
 void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
+LogicalTapeFreeze(LogicalTape *lt, TapeShare *share)
 {
-	LogicalTape *lt;
+	LogicalTapeSet *lts = lt->tapeSet;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -1058,8 +1013,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 								  lt->buffer_size - lt->nbytes);
 
 		TapeBlockSetNBytes(lt->buffer, lt->nbytes);
-		ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
-		lt->writing = false;
+		ltsWriteBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 	}
 	lt->writing = false;
 	lt->frozen = true;
@@ -1086,7 +1040,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 
 	if (lt->firstBlockNumber == -1L)
 		lt->nextBlockNumber = -1L;
-	ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+	ltsReadBlock(lt->tapeSet, lt->curBlockNumber, (void *) lt->buffer);
 	if (TapeBlockIsLast(lt->buffer))
 		lt->nextBlockNumber = -1L;
 	else
@@ -1101,25 +1055,6 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum, TapeShare *share)
 	}
 }
 
-/*
- * Add additional tapes to this tape set. Not intended to be used when any
- * tapes are frozen.
- */
-void
-LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
-{
-	int			i;
-	int			nTapesOrig = lts->nTapes;
-
-	lts->nTapes += nAdditional;
-
-	lts->tapes = (LogicalTape *) repalloc(lts->tapes,
-										  lts->nTapes * sizeof(LogicalTape));
-
-	for (i = nTapesOrig; i < lts->nTapes; i++)
-		ltsInitTape(&lts->tapes[i]);
-}
-
 /*
  * Backspace the tape a given number of bytes.  (We also support a more
  * general seek interface, see below.)
@@ -1134,18 +1069,15 @@ LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional)
  * that case.
  */
 size_t
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
 {
-	LogicalTape *lt;
 	size_t		seekpos = 0;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(lt->buffer_size == BLCKSZ);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	/*
 	 * Easy case for seek within current block.
@@ -1175,7 +1107,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 			return seekpos;
 		}
 
-		ltsReadBlock(lts, prev, (void *) lt->buffer);
+		ltsReadBlock(lt->tapeSet, prev, (void *) lt->buffer);
 
 		if (TapeBlockGetTrailer(lt->buffer)->next != lt->curBlockNumber)
 			elog(ERROR, "broken tape, next of block %ld is %ld, expected %ld",
@@ -1208,23 +1140,18 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
  * LogicalTapeTell().
  */
 void
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-				long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(offset >= 0 && offset <= TapeBlockPayloadSize);
 	Assert(lt->buffer_size == BLCKSZ);
 
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	if (blocknum != lt->curBlockNumber)
 	{
-		ltsReadBlock(lts, blocknum, (void *) lt->buffer);
+		ltsReadBlock(lt->tapeSet, blocknum, (void *) lt->buffer);
 		lt->curBlockNumber = blocknum;
 		lt->nbytes = TapeBlockPayloadSize;
 		lt->nextBlockNumber = TapeBlockGetTrailer(lt->buffer)->next;
@@ -1242,16 +1169,10 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
  * the position for a seek after freezing.  Not clear if anyone needs that.
  */
 void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-				long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
-
 	if (lt->buffer == NULL)
-		ltsInitReadBuffer(lts, lt);
+		ltsInitReadBuffer(lt);
 
 	Assert(lt->offsetBlockNumber == 0L);
 
@@ -1271,12 +1192,5 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
 long
 LogicalTapeSetBlocks(LogicalTapeSet *lts)
 {
-#ifdef USE_ASSERT_CHECKING
-	for (int i = 0; i < lts->nTapes; i++)
-	{
-		LogicalTape *lt = &lts->tapes[i];
-		Assert(!lt->writing || lt->buffer == NULL);
-	}
-#endif
 	return lts->nBlocksWritten - lts->nHoleBlocks;
 }
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 7d0f96afb78..285f6127bc0 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -262,6 +262,7 @@ struct Tuplesortstate
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
+	LogicalTape **tapes;
 
 	/*
 	 * These function pointers decouple the routines that must know what kind
@@ -290,7 +291,7 @@ struct Tuplesortstate
 	 * SortTuple struct!), and increase state->availMem by the amount of
 	 * memory space thereby released.
 	 */
-	void		(*writetup) (Tuplesortstate *state, int tapenum,
+	void		(*writetup) (Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 
 	/*
@@ -299,7 +300,7 @@ struct Tuplesortstate
 	 * from the slab memory arena, or is palloc'd, see readtup_alloc().
 	 */
 	void		(*readtup) (Tuplesortstate *state, SortTuple *stup,
-							int tapenum, unsigned int len);
+							LogicalTape *tape, unsigned int len);
 
 	/*
 	 * This array holds the tuples now in sort memory.  If we are in state
@@ -393,7 +394,7 @@ struct Tuplesortstate
 	 * the next tuple to return.  (In the tape case, the tape's current read
 	 * position is also critical state.)
 	 */
-	int			result_tape;	/* actual tape number of finished output */
+	LogicalTape *result_tape;	/* tape of finished output */
 	int			current;		/* array index (only used if SORTEDINMEM) */
 	bool		eof_reached;	/* reached EOF (needed for cursors) */
 
@@ -599,9 +600,9 @@ struct Sharedsort
  */
 
 /* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
 	do { \
-		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+		if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
 			elog(ERROR, "unexpected end of data"); \
 	} while(0)
 
@@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
@@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple);
 static void tuplesort_heap_delete_top(Tuplesortstate *state);
 static void reversedirection(Tuplesortstate *state);
-static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
-static void markrunend(Tuplesortstate *state, int tapenum);
+static unsigned int getlen(LogicalTape *tape, bool eofOK);
+static void markrunend(LogicalTape *tape);
 static void *readtup_alloc(Tuplesortstate *state, Size tuplen);
 static int	comparetup_heap(const SortTuple *a, const SortTuple *b,
 							Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_heap(Tuplesortstate *state, int tapenum,
+static void writetup_heap(Tuplesortstate *state, LogicalTape *tape,
 						  SortTuple *stup);
 static void readtup_heap(Tuplesortstate *state, SortTuple *stup,
-						 int tapenum, unsigned int len);
+						 LogicalTape *tape, unsigned int len);
 static int	comparetup_cluster(const SortTuple *a, const SortTuple *b,
 							   Tuplesortstate *state);
 static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_cluster(Tuplesortstate *state, int tapenum,
+static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape,
 							 SortTuple *stup);
 static void readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-							int tapenum, unsigned int len);
+							LogicalTape *tape, unsigned int len);
 static int	comparetup_index_btree(const SortTuple *a, const SortTuple *b,
 								   Tuplesortstate *state);
 static int	comparetup_index_hash(const SortTuple *a, const SortTuple *b,
 								  Tuplesortstate *state);
 static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_index(Tuplesortstate *state, int tapenum,
+static void writetup_index(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_index(Tuplesortstate *state, SortTuple *stup,
-						  int tapenum, unsigned int len);
+						  LogicalTape *tape, unsigned int len);
 static int	comparetup_datum(const SortTuple *a, const SortTuple *b,
 							 Tuplesortstate *state);
 static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup);
-static void writetup_datum(Tuplesortstate *state, int tapenum,
+static void writetup_datum(Tuplesortstate *state, LogicalTape *tape,
 						   SortTuple *stup);
 static void readtup_datum(Tuplesortstate *state, SortTuple *stup,
-						  int tapenum, unsigned int len);
+						  LogicalTape *tape, unsigned int len);
 static int	worker_get_identifier(Tuplesortstate *state);
 static void worker_freeze_result_tape(Tuplesortstate *state);
 static void worker_nomergeruns(Tuplesortstate *state);
@@ -869,7 +870,7 @@ tuplesort_begin_batch(Tuplesortstate *state)
 	 * inittapes(), if needed
 	 */
 
-	state->result_tape = -1;	/* flag that result tape has not been formed */
+	state->result_tape = NULL;	/* flag that result tape has not been formed */
 
 	MemoryContextSwitchTo(oldcontext);
 }
@@ -2202,7 +2203,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				if (state->eof_reached)
 					return false;
 
-				if ((tuplen = getlen(state, state->result_tape, true)) != 0)
+				if ((tuplen = getlen(state->result_tape, true)) != 0)
 				{
 					READTUP(state, stup, state->result_tape, tuplen);
 
@@ -2235,8 +2236,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * end of file; back up to fetch last tuple's ending length
 				 * word.  If seek fails we must have a completely empty file.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  2 * sizeof(unsigned int));
 				if (nmoved == 0)
 					return false;
@@ -2250,20 +2250,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * Back up and fetch previously-returned tuple's ending length
 				 * word.  If seek fails, assume we are at start of file.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  sizeof(unsigned int));
 				if (nmoved == 0)
 					return false;
 				else if (nmoved != sizeof(unsigned int))
 					elog(ERROR, "unexpected tape position");
-				tuplen = getlen(state, state->result_tape, false);
+				tuplen = getlen(state->result_tape, false);
 
 				/*
 				 * Back up to get ending length word of tuple before it.
 				 */
-				nmoved = LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+				nmoved = LogicalTapeBackspace(state->result_tape,
 											  tuplen + 2 * sizeof(unsigned int));
 				if (nmoved == tuplen + sizeof(unsigned int))
 				{
@@ -2280,15 +2278,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					elog(ERROR, "bogus tuple length in backward scan");
 			}
 
-			tuplen = getlen(state, state->result_tape, false);
+			tuplen = getlen(state->result_tape, false);
 
 			/*
 			 * Now we have the length of the prior tuple, back up and read it.
 			 * Note: READTUP expects we are positioned after the initial
 			 * length word of the tuple, so back up to that point.
 			 */
-			nmoved = LogicalTapeBackspace(state->tapeset,
-										  state->result_tape,
+			nmoved = LogicalTapeBackspace(state->result_tape,
 										  tuplen);
 			if (nmoved != tuplen)
 				elog(ERROR, "bogus tuple length in backward scan");
@@ -2346,11 +2343,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					tuplesort_heap_delete_top(state);
 
 					/*
-					 * Rewind to free the read buffer.  It'd go away at the
-					 * end of the sort anyway, but better to release the
-					 * memory early.
+					 * Close the tape.  It'd go away at the end of the sort
+					 * anyway, but better to release the memory early.
 					 */
-					LogicalTapeRewindForWrite(state->tapeset, srcTape);
+					LogicalTapeClose(state->tapes[srcTape]);
 					return true;
 				}
 				newtup.srctape = srcTape;
@@ -2648,9 +2644,12 @@ inittapes(Tuplesortstate *state, bool mergeruns)
 	/* Create the tape set and allocate the per-tape data arrays */
 	inittapestate(state, maxTapes);
 	state->tapeset =
-		LogicalTapeSetCreate(maxTapes, false, NULL,
+		LogicalTapeSetCreate(false,
 							 state->shared ? &state->shared->fileset : NULL,
 							 state->worker);
+	state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
+	for (j = 0; j < maxTapes; j++)
+		state->tapes[j] = LogicalTapeCreate(state->tapeset);
 
 	state->currentRun = 0;
 
@@ -2900,7 +2899,7 @@ mergeruns(Tuplesortstate *state)
 
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-		LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);
+		LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
 
 	for (;;)
 	{
@@ -2962,11 +2961,14 @@ mergeruns(Tuplesortstate *state)
 		/* Step D6: decrease level */
 		if (--state->Level == 0)
 			break;
+
 		/* rewind output tape T to use as new input */
-		LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
+		LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
 								 state->read_buffer_size);
-		/* rewind used-up input tape P, and prepare it for write pass */
-		LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
+
+		/* close used-up input tape P, and create a new one for write pass */
+		LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
+		state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
 		state->tp_runs[state->tapeRange - 1] = 0;
 
 		/*
@@ -2994,18 +2996,21 @@ mergeruns(Tuplesortstate *state)
 	 * output tape while rewinding it.  The last iteration of step D6 would be
 	 * a waste of cycles anyway...
 	 */
-	state->result_tape = state->tp_tapenum[state->tapeRange];
+	state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
 	if (!WORKER(state))
-		LogicalTapeFreeze(state->tapeset, state->result_tape, NULL);
+		LogicalTapeFreeze(state->result_tape, NULL);
 	else
 		worker_freeze_result_tape(state);
 	state->status = TSS_SORTEDONTAPE;
 
-	/* Release the read buffers of all the other tapes, by rewinding them. */
+	/* Close all the other tapes, to release their read buffers. */
 	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
 	{
-		if (tapenum != state->result_tape)
-			LogicalTapeRewindForWrite(state->tapeset, tapenum);
+		if (state->tapes[tapenum] != state->result_tape)
+		{
+			LogicalTapeClose(state->tapes[tapenum]);
+			state->tapes[tapenum] = NULL;
+		}
 	}
 }
 
@@ -3018,7 +3023,8 @@ mergeruns(Tuplesortstate *state)
 static void
 mergeonerun(Tuplesortstate *state)
 {
-	int			destTape = state->tp_tapenum[state->tapeRange];
+	int			destTapeNum = state->tp_tapenum[state->tapeRange];
+	LogicalTape *destTape = state->tapes[destTapeNum];
 	int			srcTape;
 
 	/*
@@ -3061,7 +3067,7 @@ mergeonerun(Tuplesortstate *state)
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
 	 * output tape, and increment its count of real runs.
 	 */
-	markrunend(state, destTape);
+	markrunend(destTape);
 	state->tp_runs[state->tapeRange]++;
 
 #ifdef TRACE_SORT
@@ -3127,17 +3133,18 @@ beginmerge(Tuplesortstate *state)
  * Returns false on EOF.
  */
 static bool
-mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
 {
+	LogicalTape *srcTape = state->tapes[srcTapeIndex];
 	unsigned int tuplen;
 
-	if (!state->mergeactive[srcTape])
+	if (!state->mergeactive[srcTapeIndex])
 		return false;			/* tape's run is already exhausted */
 
 	/* read next tuple, if any */
-	if ((tuplen = getlen(state, srcTape, true)) == 0)
+	if ((tuplen = getlen(srcTape, true)) == 0)
 	{
-		state->mergeactive[srcTape] = false;
+		state->mergeactive[srcTapeIndex] = false;
 		return false;
 	}
 	READTUP(state, stup, srcTape, tuplen);
@@ -3154,6 +3161,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup)
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
+	LogicalTape *destTape;
 	int			memtupwrite;
 	int			i;
 
@@ -3220,10 +3228,10 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 #endif
 
 	memtupwrite = state->memtupcount;
+	destTape = state->tapes[state->tp_tapenum[state->destTape]];
 	for (i = 0; i < memtupwrite; i++)
 	{
-		WRITETUP(state, state->tp_tapenum[state->destTape],
-				 &state->memtuples[i]);
+		WRITETUP(state, destTape, &state->memtuples[i]);
 		state->memtupcount--;
 	}
 
@@ -3236,7 +3244,7 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 	 */
 	MemoryContextReset(state->tuplecontext);
 
-	markrunend(state, state->tp_tapenum[state->destTape]);
+	markrunend(destTape);
 	state->tp_runs[state->destTape]++;
 	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
 
@@ -3270,9 +3278,7 @@ tuplesort_rescan(Tuplesortstate *state)
 			state->markpos_eof = false;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeRewindForRead(state->tapeset,
-									 state->result_tape,
-									 0);
+			LogicalTapeRewindForRead(state->result_tape, 0);
 			state->eof_reached = false;
 			state->markpos_block = 0L;
 			state->markpos_offset = 0;
@@ -3303,8 +3309,7 @@ tuplesort_markpos(Tuplesortstate *state)
 			state->markpos_eof = state->eof_reached;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeTell(state->tapeset,
-							state->result_tape,
+			LogicalTapeTell(state->result_tape,
 							&state->markpos_block,
 							&state->markpos_offset);
 			state->markpos_eof = state->eof_reached;
@@ -3335,8 +3340,7 @@ tuplesort_restorepos(Tuplesortstate *state)
 			state->eof_reached = state->markpos_eof;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeSeek(state->tapeset,
-							state->result_tape,
+			LogicalTapeSeek(state->result_tape,
 							state->markpos_block,
 							state->markpos_offset);
 			state->eof_reached = state->markpos_eof;
@@ -3678,11 +3682,11 @@ reversedirection(Tuplesortstate *state)
  */
 
 static unsigned int
-getlen(Tuplesortstate *state, int tapenum, bool eofOK)
+getlen(LogicalTape *tape, bool eofOK)
 {
 	unsigned int len;
 
-	if (LogicalTapeRead(state->tapeset, tapenum,
+	if (LogicalTapeRead(tape,
 						&len, sizeof(len)) != sizeof(len))
 		elog(ERROR, "unexpected end of tape");
 	if (len == 0 && !eofOK)
@@ -3691,11 +3695,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
 }
 
 static void
-markrunend(Tuplesortstate *state, int tapenum)
+markrunend(LogicalTape *tape)
 {
 	unsigned int len = 0;
 
-	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+	LogicalTapeWrite(tape, (void *) &len, sizeof(len));
 }
 
 /*
@@ -3873,7 +3877,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	MinimalTuple tuple = (MinimalTuple) stup->tuple;
 
@@ -3884,13 +3888,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 	/* total on-disk footprint: */
 	unsigned int tuplen = tupbodylen + sizeof(int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) tupbody, tupbodylen);
+	LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, (void *) tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -3901,7 +3902,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_heap(Tuplesortstate *state, SortTuple *stup,
-			 int tapenum, unsigned int len)
+			 LogicalTape *tape, unsigned int len)
 {
 	unsigned int tupbodylen = len - sizeof(int);
 	unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
@@ -3911,11 +3912,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 
 	/* read in the tuple proper */
 	tuple->t_len = tuplen;
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tupbody, tupbodylen);
+	LogicalTapeReadExact(tape, tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
 	htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET;
@@ -4116,21 +4115,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	HeapTuple	tuple = (HeapTuple) stup->tuple;
 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
 
 	/* We need to store t_self, but not other fields of HeapTupleData */
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 &tuple->t_self, sizeof(ItemPointerData));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 tuple->t_data, tuple->t_len);
+	LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData));
+	LogicalTapeWrite(tape, tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -4141,7 +4136,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_cluster(Tuplesortstate *state, SortTuple *stup,
-				int tapenum, unsigned int tuplen)
+				LogicalTape *tape, unsigned int tuplen)
 {
 	unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
 	HeapTuple	tuple = (HeapTuple) readtup_alloc(state,
@@ -4150,16 +4145,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 	/* Reconstruct the HeapTupleData header */
 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
 	tuple->t_len = t_len;
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 &tuple->t_self, sizeof(ItemPointerData));
+	LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData));
 	/* We don't currently bother to reconstruct t_tableOid */
 	tuple->t_tableOid = InvalidOid;
 	/* Read in the tuple body */
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tuple->t_data, tuple->t_len);
+	LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value, if it's a simple column */
 	if (state->indexInfo->ii_IndexAttrNumbers[0] != 0)
@@ -4373,19 +4365,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	IndexTuple	tuple = (IndexTuple) stup->tuple;
 	unsigned int tuplen;
 
 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) tuple, IndexTupleSize(tuple));
+	LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
+	LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple));
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &tuplen, sizeof(tuplen));
+		LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen));
 
 	if (!state->slabAllocatorUsed)
 	{
@@ -4396,16 +4385,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_index(Tuplesortstate *state, SortTuple *stup,
-			  int tapenum, unsigned int len)
+			  LogicalTape *tape, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tuplen);
 
-	LogicalTapeReadExact(state->tapeset, tapenum,
-						 tuple, tuplen);
+	LogicalTapeReadExact(tape, tuple, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
 	stup->datum1 = index_getattr(tuple,
@@ -4447,7 +4434,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup)
 }
 
 static void
-writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
+writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup)
 {
 	void	   *waddr;
 	unsigned int tuplen;
@@ -4472,13 +4459,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 	writtenlen = tuplen + sizeof(unsigned int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 (void *) &writtenlen, sizeof(writtenlen));
-	LogicalTapeWrite(state->tapeset, tapenum,
-					 waddr, tuplen);
+	LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
+	LogicalTapeWrite(tape, waddr, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
-						 (void *) &writtenlen, sizeof(writtenlen));
+		LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen));
 
 	if (!state->slabAllocatorUsed && stup->tuple)
 	{
@@ -4489,7 +4473,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 static void
 readtup_datum(Tuplesortstate *state, SortTuple *stup,
-			  int tapenum, unsigned int len)
+			  LogicalTape *tape, unsigned int len)
 {
 	unsigned int tuplen = len - sizeof(unsigned int);
 
@@ -4503,8 +4487,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	else if (!state->tuples)
 	{
 		Assert(tuplen == sizeof(Datum));
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &stup->datum1, tuplen);
+		LogicalTapeReadExact(tape, &stup->datum1, tuplen);
 		stup->isnull1 = false;
 		stup->tuple = NULL;
 	}
@@ -4512,16 +4495,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	{
 		void	   *raddr = readtup_alloc(state, tuplen);
 
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 raddr, tuplen);
+		LogicalTapeReadExact(tape, raddr, tuplen);
 		stup->datum1 = PointerGetDatum(raddr);
 		stup->isnull1 = false;
 		stup->tuple = raddr;
 	}
 
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
-							 &tuplen, sizeof(tuplen));
+		LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen));
 }
 
 /*
@@ -4633,7 +4614,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
 	TapeShare	output;
 
 	Assert(WORKER(state));
-	Assert(state->result_tape != -1);
+	Assert(state->result_tape != NULL);
 	Assert(state->memtupcount == 0);
 
 	/*
@@ -4649,7 +4630,7 @@ worker_freeze_result_tape(Tuplesortstate *state)
 	 * Parallel worker requires result tape metadata, which is to be stored in
 	 * shared memory for leader
 	 */
-	LogicalTapeFreeze(state->tapeset, state->result_tape, &output);
+	LogicalTapeFreeze(state->result_tape, &output);
 
 	/* Store properties of output tape, and update finished worker count */
 	SpinLockAcquire(&shared->mutex);
@@ -4668,9 +4649,9 @@ static void
 worker_nomergeruns(Tuplesortstate *state)
 {
 	Assert(WORKER(state));
-	Assert(state->result_tape == -1);
+	Assert(state->result_tape == NULL);
 
-	state->result_tape = state->tp_tapenum[state->destTape];
+	state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
 	worker_freeze_result_tape(state);
 }
 
@@ -4714,9 +4695,13 @@ leader_takeover_tapes(Tuplesortstate *state)
 	 * randomAccess is disallowed for parallel sorts.
 	 */
 	inittapestate(state, nParticipants + 1);
-	state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false,
-										  shared->tapes, &shared->fileset,
+	state->tapeset = LogicalTapeSetCreate(false,
+										  &shared->fileset,
 										  state->worker);
+	state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
+	for (j = 0; j < nParticipants; j++)
+		state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
+	/* tapes[nParticipants] represents the "leader tape", which is not used */
 
 	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
 	state->currentRun = nParticipants;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index d65099c94aa..da7df7fd5f2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -40,6 +40,7 @@ struct ExprContext;
 struct RangeTblEntry;			/* avoid including parsenodes.h here */
 struct ExprEvalStep;			/* avoid including execExpr.h everywhere */
 struct CopyMultiInsertBuffer;
+struct LogicalTapeSet;
 
 
 /* ----------------
@@ -2178,7 +2179,7 @@ typedef struct AggState
 	bool		table_filled;	/* hash table filled yet? */
 	int			num_hashes;
 	MemoryContext hash_metacxt; /* memory for hash table itself */
-	struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
+	struct LogicalTapeSet *hash_tapeset;	/* tape set for hash spill tapes */
 	struct HashAggSpill *hash_spills;	/* HashAggSpill for each grouping set,
 										 * exists only during first pass */
 	TupleTableSlot *hash_spill_rslot;	/* for reading spill files */
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 85d2e03c631..758a19779c6 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -18,9 +18,13 @@
 
 #include "storage/sharedfileset.h"
 
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
-
+/*
+ * LogicalTapeSet and LogicalTape are opaque types whose details are not
+ * known outside logtape.c.
+ */
 typedef struct LogicalTapeSet LogicalTapeSet;
+typedef struct LogicalTape LogicalTape;
+
 
 /*
  * The approach tuplesort.c takes to parallel external sorts is that workers,
@@ -54,27 +58,20 @@ typedef struct TapeShare
  * prototypes for functions in logtape.c
  */
 
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, bool preallocate,
-											TapeShare *shared,
+extern LogicalTapeSet *LogicalTapeSetCreate(bool preallocate,
 											SharedFileSet *fileset, int worker);
+extern void LogicalTapeClose(LogicalTape *lt);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts);
+extern LogicalTape *LogicalTapeImport(LogicalTapeSet *lts, int worker, TapeShare *shared);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-							  void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-							 void *ptr, size_t size);
-extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
-									 size_t buffer_size);
-extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
-							  TapeShare *share);
-extern void LogicalTapeSetExtend(LogicalTapeSet *lts, int nAdditional);
-extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
-								   size_t size);
-extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-							long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-							long *blocknum, int *offset);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewindForRead(LogicalTape *lt, size_t buffer_size);
+extern void LogicalTapeFreeze(LogicalTape *lt, TapeShare *share);
+extern size_t LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern void LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
 #endif							/* LOGTAPE_H */
-- 
2.29.2

>From 3bd040f5b8042589c39e3b1c0ea167192f0e8c48 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Thu, 22 Oct 2020 14:34:48 +0300
Subject: [PATCH v3 2/3] Replace polyphase merge algorithm with a simple
 balanced k-way merge.

The advantage of polyphase merge is that it can reuse the input tapes as
output tapes efficiently, but that is irrelevant on modern hardware, when
we can easily emulate any number of tape drives. The number of input tapes
we can/should use during merging is limited by work_mem, but output tapes
that we are not currently writing to only cost a little bit of memory, so
there is no need to skimp on them.

This makes sorts that need multiple merge passes faster.
---
 src/backend/utils/sort/tuplesort.c | 662 +++++++++++++----------------
 1 file changed, 293 insertions(+), 369 deletions(-)

diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 285f6127bc0..14104643e78 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -10,14 +10,22 @@
  * amounts are sorted using temporary files and a standard external sort
  * algorithm.
  *
- * See Knuth, volume 3, for more than you want to know about the external
- * sorting algorithm.  Historically, we divided the input into sorted runs
- * using replacement selection, in the form of a priority tree implemented
- * as a heap (essentially his Algorithm 5.2.3H), but now we always use
- * quicksort for run generation.  We merge the runs using polyphase merge,
- * Knuth's Algorithm 5.4.2D.  The logical "tapes" used by Algorithm D are
- * implemented by logtape.c, which avoids space wastage by recycling disk
- * space as soon as each block is read from its "tape".
+ * See Knuth, volume 3, for more than you want to know about external
+ * sorting algorithms.  The algorithm we use is a balanced k-way merge.
+ * Before PostgreSQL 14, we used the polyphase merge algorithm (Knuth's
+ * Algorithm 5.4.2D), but with modern hardware, a straightforward balanced
+ * merge is better.  Knuth is assuming that tape drives are expensive
+ * beasts, and in particular that there will always be many more runs than
+ * tape drives.  The polyphase merge algorithm was good at keeping all the
+ * tape drives busy, but in our implementation a "tape drive" doesn't cost
+ * much more than a few Kb of memory buffers, so we can afford to have
+ * lots of them.  In particular, if we can have as many tape drives as
+ * sorted runs, we can eliminate any repeated I/O at all.
+ *
+ * Historically, we divided the input into sorted runs using replacement
+ * selection, in the form of a priority tree implemented as a heap
+ * (essentially Knuth's Algorithm 5.2.3H), but now we always use quicksort
+ * for run generation.
  *
  * The approximate amount of memory allowed for any one sort operation
  * is specified in kilobytes by the caller (most pass work_mem).  Initially,
@@ -27,9 +35,11 @@
  * tuples just by scanning the tuple array sequentially.  If we do exceed
  * workMem, we begin to emit tuples into sorted runs in temporary tapes.
  * When tuples are dumped in batch after quicksorting, we begin a new run
- * with a new output tape (selected per Algorithm D).  After the end of the
- * input is reached, we dump out remaining tuples in memory into a final run,
- * then merge the runs using Algorithm D.
+ * with a new output tape.  If we reach the max number of tapes, we write
+ * subsequent runs on the existing tapes in a round-robin fashion.  We will
+ * need multiple merge passes to finish the merge in that case.  After the
+ * end of the input is reached, we dump out remaining tuples in memory into
+ * a final run, then merge the runs.
  *
  * When merging runs, we use a heap containing just the frontmost tuple from
  * each source run; we repeatedly output the smallest tuple and replace it
@@ -52,6 +62,14 @@
  * accesses.  The pre-reading is handled by logtape.c, we just tell it how
  * much memory to use for the buffers.
  *
+ * In the current code we determine the number of input tapes M on the basis
+ * of workMem: we want workMem/M to be large enough that we read a fair
+ * amount of data each time we read from a tape, so as to maintain the
+ * locality of access described above.  Nonetheless, with large workMem we
+ * can have many tapes.  The logical "tapes" are implemented by logtape.c,
+ * which avoids space wastage by recycling disk space as soon as each block
+ * is read from its "tape".
+ *
  * When the caller requests random access to the sort result, we form
  * the final sorted run on a logical tape which is then "frozen", so
  * that we can access it randomly.  When the caller does not need random
@@ -60,20 +78,6 @@
  * on-the-fly as the caller repeatedly calls tuplesort_getXXX; this
  * saves one cycle of writing all the data out to disk and reading it in.
  *
- * Before Postgres 8.2, we always used a seven-tape polyphase merge, on the
- * grounds that 7 is the "sweet spot" on the tapes-to-passes curve according
- * to Knuth's figure 70 (section 5.4.2).  However, Knuth is assuming that
- * tape drives are expensive beasts, and in particular that there will always
- * be many more runs than tape drives.  In our implementation a "tape drive"
- * doesn't cost much more than a few Kb of memory buffers, so we can afford
- * to have lots of them.  In particular, if we can have as many tape drives
- * as sorted runs, we can eliminate any repeated I/O at all.  In the current
- * code we determine the number of tapes M on the basis of workMem: we want
- * workMem/M to be large enough that we read a fair amount of data each time
- * we preread from a tape, so as to maintain the locality of access described
- * above.  Nonetheless, with large workMem we can have many tapes (but not
- * too many -- see the comments in tuplesort_merge_order).
- *
  * This module supports parallel sorting.  Parallel sorts involve coordination
  * among one or more worker processes, and a leader process, each with its own
  * tuplesort state.  The leader process (or, more accurately, the
@@ -223,8 +227,9 @@ typedef enum
  * worth of buffer space.  This ignores the overhead of all the other data
  * structures needed for each tape, but it's probably close enough.
  *
- * MERGE_BUFFER_SIZE is how much data we'd like to read from each input
- * tape during a preread cycle (see discussion at top of file).
+ * MERGE_BUFFER_SIZE is how much buffer space we'd like to allocate for each
+ * input tape, for pre-reading (see discussion at top of file).  This is *in
+ * addition to* the 1 block already included in TAPE_BUFFER_OVERHEAD.
  */
 #define MINORDER		6		/* minimum merge order */
 #define MAXORDER		500		/* maximum merge order */
@@ -249,8 +254,8 @@ struct Tuplesortstate
 	bool		tuples;			/* Can SortTuple.tuple ever be set? */
 	int64		availMem;		/* remaining memory available, in bytes */
 	int64		allowedMem;		/* total memory allowed, in bytes */
-	int			maxTapes;		/* number of tapes (Knuth's T) */
-	int			tapeRange;		/* maxTapes-1 (Knuth's P) */
+	int			maxTapes;		/* max number of input tapes to merge in each
+								 * pass */
 	int64		maxSpace;		/* maximum amount of space occupied among sort
 								 * of groups, either in-memory or on-disk */
 	bool		isMaxSpaceDisk; /* true when maxSpace is value for on-disk
@@ -262,7 +267,6 @@ struct Tuplesortstate
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
-	LogicalTape **tapes;
 
 	/*
 	 * These function pointers decouple the routines that must know what kind
@@ -347,8 +351,8 @@ struct Tuplesortstate
 	char	   *slabMemoryEnd;	/* end of slab memory arena */
 	SlabSlot   *slabFreeHead;	/* head of free list */
 
-	/* Buffer size to use for reading input tapes, during merge. */
-	size_t		read_buffer_size;
+	/* Memory used for input and output tape buffers. */
+	size_t		tape_buffer_mem;
 
 	/*
 	 * When we return a tuple to the caller in tuplesort_gettuple_XXX, that
@@ -365,36 +369,29 @@ struct Tuplesortstate
 	int			currentRun;
 
 	/*
-	 * Unless otherwise noted, all pointer variables below are pointers to
-	 * arrays of length maxTapes, holding per-tape data.
+	 * Logical tapes, for merging.
+	 *
+	 * The initial runs are written in the output tapes.  In each merge pass,
+	 * the output tapes of the previous pass become the input tapes, and new
+	 * output tapes are created as needed.  When nInputTapes equals
+	 * nInputRuns, there is only one merge pass left.
 	 */
+	LogicalTape **inputTapes;
+	int			nInputTapes;
+	int			nInputRuns;
 
-	/*
-	 * This variable is only used during merge passes.  mergeactive[i] is true
-	 * if we are reading an input run from (actual) tape number i and have not
-	 * yet exhausted that run.
-	 */
-	bool	   *mergeactive;	/* active input run source? */
+	LogicalTape **outputTapes;
+	int			nOutputTapes;
+	int			nOutputRuns;
 
-	/*
-	 * Variables for Algorithm D.  Note that destTape is a "logical" tape
-	 * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
-	 * "logical" and "actual" tape numbers straight!
-	 */
-	int			Level;			/* Knuth's l */
-	int			destTape;		/* current output tape (Knuth's j, less 1) */
-	int		   *tp_fib;			/* Target Fibonacci run counts (A[]) */
-	int		   *tp_runs;		/* # of real runs on each tape */
-	int		   *tp_dummy;		/* # of dummy runs for each tape (D[]) */
-	int		   *tp_tapenum;		/* Actual tape numbers (TAPE[]) */
-	int			activeTapes;	/* # of active input tapes in merge pass */
+	LogicalTape *destTape;		/* current output tape */
 
 	/*
 	 * These variables are used after completion of sorting to keep track of
 	 * the next tuple to return.  (In the tape case, the tape's current read
 	 * position is also critical state.)
 	 */
-	LogicalTape *result_tape;	/* tape of finished output */
+	LogicalTape *result_tape;	/* actual tape of finished output */
 	int			current;		/* array index (only used if SORTEDINMEM) */
 	bool		eof_reached;	/* reached EOF (needed for cursors) */
 
@@ -415,8 +412,9 @@ struct Tuplesortstate
 	 *
 	 * nParticipants is the number of worker Tuplesortstates known by the
 	 * leader to have actually been launched, which implies that they must
-	 * finish a run leader can merge.  Typically includes a worker state held
-	 * by the leader process itself.  Set in the leader Tuplesortstate only.
+	 * finish a run that the leader needs to merge.  Typically includes a
+	 * worker state held by the leader process itself.  Set in the leader
+	 * Tuplesortstate only.
 	 */
 	int			worker;
 	Sharedsort *shared;
@@ -620,7 +618,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
 static void beginmerge(Tuplesortstate *state);
-static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup);
+static bool mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void make_bounded_heap(Tuplesortstate *state);
 static void sort_bounded_heap(Tuplesortstate *state);
@@ -866,8 +864,8 @@ tuplesort_begin_batch(Tuplesortstate *state)
 	state->currentRun = 0;
 
 	/*
-	 * maxTapes, tapeRange, and Algorithm D variables will be initialized by
-	 * inittapes(), if needed
+	 * Tape variables (inputTapes, outputTapes, etc.) will be initialized by
+	 * inittapes(), if needed.
 	 */
 
 	state->result_tape = NULL;	/* flag that result tape has not been formed */
@@ -1389,6 +1387,10 @@ tuplesort_free(Tuplesortstate *state)
 	 *
 	 * Note: want to include this in reported total cost of sort, hence need
 	 * for two #ifdef TRACE_SORT sections.
+	 *
+	 * We don't bother to destroy the individual tapes here. They will go away
+	 * with the sortcontext.  (In TSS_FINALMERGE state, we have closed
+	 * finished tapes already.)
 	 */
 	if (state->tapeset)
 		LogicalTapeSetClose(state->tapeset);
@@ -2111,7 +2113,7 @@ tuplesort_performsort(Tuplesortstate *state)
 	{
 		if (state->status == TSS_FINALMERGE)
 			elog(LOG, "performsort of worker %d done (except %d-way final merge): %s",
-				 state->worker, state->activeTapes,
+				 state->worker, state->nInputTapes,
 				 pg_rusage_show(&state->ru_start));
 		else
 			elog(LOG, "performsort of worker %d done: %s",
@@ -2319,7 +2321,8 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 			 */
 			if (state->memtupcount > 0)
 			{
-				int			srcTape = state->memtuples[0].srctape;
+				int			srcTapeIndex = state->memtuples[0].srctape;
+				LogicalTape *srcTape = state->inputTapes[srcTapeIndex];
 				SortTuple	newtup;
 
 				*stup = state->memtuples[0];
@@ -2341,15 +2344,16 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					 * Remove the top node from the heap.
 					 */
 					tuplesort_heap_delete_top(state);
+					state->nInputRuns--;
 
 					/*
 					 * Close the tape.  It'd go away at the end of the sort
 					 * anyway, but better to release the memory early.
 					 */
-					LogicalTapeClose(state->tapes[srcTape]);
+					LogicalTapeClose(srcTape);
 					return true;
 				}
-				newtup.srctape = srcTape;
+				newtup.srctape = srcTapeIndex;
 				tuplesort_heap_replace_top(state, &newtup);
 				return true;
 			}
@@ -2580,18 +2584,29 @@ tuplesort_merge_order(int64 allowedMem)
 {
 	int			mOrder;
 
-	/*
-	 * We need one tape for each merge input, plus another one for the output,
-	 * and each of these tapes needs buffer space.  In addition we want
-	 * MERGE_BUFFER_SIZE workspace per input tape (but the output tape doesn't
-	 * count).
+	/*----------
+	 * In the merge phase, we need buffer space for each input and output tape.
+	 * Each pass in the balanced merge algorithm reads from M input tapes, and
+	 * writes to N output tapes.  Each tape consumes TAPE_BUFFER_OVERHEAD bytes
+	 * of memory.  In addition to that, we want MERGE_BUFFER_SIZE workspace per
+	 * input tape.
+	 *
+	 * totalMem = M * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE) +
+	 *            N * TAPE_BUFFER_OVERHEAD
+	 *
+	 * Except for the last and next-to-last merge passes, where there can be
+	 * fewer tapes left to process, M = N.  We choose M so that we have the
+	 * desired amount of memory available for the input buffers
+	 * (TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE), given the total memory
+	 * available for the tape buffers (allowedMem).
 	 *
 	 * Note: you might be thinking we need to account for the memtuples[]
 	 * array in this calculation, but we effectively treat that as part of the
 	 * MERGE_BUFFER_SIZE workspace.
+	 *----------
 	 */
-	mOrder = (allowedMem - TAPE_BUFFER_OVERHEAD) /
-		(MERGE_BUFFER_SIZE + TAPE_BUFFER_OVERHEAD);
+	mOrder = allowedMem /
+		(2 * TAPE_BUFFER_OVERHEAD + MERGE_BUFFER_SIZE);
 
 	/*
 	 * Even in minimum memory, use at least a MINORDER merge.  On the other
@@ -2601,7 +2616,7 @@ tuplesort_merge_order(int64 allowedMem)
 	 * which in turn can cause the same sort to need more runs, which makes
 	 * merging slower even if it can still be done in a single pass.  Also,
 	 * high order merges are quite slow due to CPU cache effects; it can be
-	 * faster to pay the I/O cost of a polyphase merge than to perform a
+	 * faster to pay the I/O cost of a multi-pass merge than to perform a
 	 * single merge pass across many hundreds of tapes.
 	 */
 	mOrder = Max(mOrder, MINORDER);
@@ -2610,6 +2625,42 @@ tuplesort_merge_order(int64 allowedMem)
 	return mOrder;
 }
 
+/*
+ * Helper function to calculate how much memory to allocate for the read buffer
+ * of each input tape in a merge pass.
+ *
+ * 'avail_mem' is the amount of memory available for the buffers of all the
+ *		tapes, both input and output.
+ * 'nInputTapes' and 'nInputRuns' are the number of input tapes and runs.
+ * 'maxOutputTapes' is the max. number of output tapes we should produce.
+ */
+static int64
+merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
+					   int maxOutputTapes)
+{
+	int			nOutputRuns;
+	int			nOutputTapes;
+
+	/*
+	 * How many output tapes will we produce in this pass?
+	 *
+	 * This is nInputRuns / nInputTapes, rounded up.
+	 */
+	nOutputRuns = (nInputRuns + nInputTapes - 1) / nInputTapes;
+
+	nOutputTapes = Min(nOutputRuns, maxOutputTapes);
+
+	/*
+	 * Each output tape consumes TAPE_BUFFER_OVERHEAD bytes of memory.  All
+	 * remaining memory is divided evenly between the input tapes.
+	 *
+	 * This also follows from the formula in tuplesort_merge_order, but here
+	 * we derive the input buffer size from the amount of memory available,
+	 * and M and N.
+	 */
+	return (avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes;
+}
+
 /*
  * inittapes - initialize for tape sorting.
  *
@@ -2618,58 +2669,49 @@ tuplesort_merge_order(int64 allowedMem)
 static void
 inittapes(Tuplesortstate *state, bool mergeruns)
 {
-	int			maxTapes,
-				j;
-
 	Assert(!LEADER(state));
 
 	if (mergeruns)
 	{
-		/* Compute number of tapes to use: merge order plus 1 */
-		maxTapes = tuplesort_merge_order(state->allowedMem) + 1;
+		/* Compute number of input tapes to use when merging */
+		state->maxTapes = tuplesort_merge_order(state->allowedMem);
 	}
 	else
 	{
 		/* Workers can sometimes produce single run, output without merge */
 		Assert(WORKER(state));
-		maxTapes = MINORDER + 1;
+		state->maxTapes = MINORDER;
 	}
 
 #ifdef TRACE_SORT
 	if (trace_sort)
 		elog(LOG, "worker %d switching to external sort with %d tapes: %s",
-			 state->worker, maxTapes, pg_rusage_show(&state->ru_start));
+			 state->worker, state->maxTapes, pg_rusage_show(&state->ru_start));
 #endif
 
-	/* Create the tape set and allocate the per-tape data arrays */
-	inittapestate(state, maxTapes);
+	/* Create the tape set */
+	inittapestate(state, state->maxTapes);
 	state->tapeset =
 		LogicalTapeSetCreate(false,
 							 state->shared ? &state->shared->fileset : NULL,
 							 state->worker);
-	state->tapes = palloc(maxTapes * sizeof(LogicalTape *));
-	for (j = 0; j < maxTapes; j++)
-		state->tapes[j] = LogicalTapeCreate(state->tapeset);
 
 	state->currentRun = 0;
 
 	/*
-	 * Initialize variables of Algorithm D (step D1).
+	 * Initialize logical tape arrays.
 	 */
-	for (j = 0; j < maxTapes; j++)
-	{
-		state->tp_fib[j] = 1;
-		state->tp_runs[j] = 0;
-		state->tp_dummy[j] = 1;
-		state->tp_tapenum[j] = j;
-	}
-	state->tp_fib[state->tapeRange] = 0;
-	state->tp_dummy[state->tapeRange] = 0;
+	state->inputTapes = NULL;
+	state->nInputTapes = 0;
+	state->nInputRuns = 0;
 
-	state->Level = 1;
-	state->destTape = 0;
+	state->outputTapes = palloc0(state->maxTapes * sizeof(LogicalTape *));
+	state->nOutputTapes = 0;
+	state->nOutputRuns = 0;
 
 	state->status = TSS_BUILDRUNS;
+
+	selectnewtape(state);
 }
 
 /*
@@ -2700,52 +2742,37 @@ inittapestate(Tuplesortstate *state, int maxTapes)
 	 * called already, but it doesn't matter if it is called a second time.
 	 */
 	PrepareTempTablespaces();
-
-	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
-	state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
-	state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
-	state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
-	state->tp_tapenum = (int *) palloc0(maxTapes * sizeof(int));
-
-	/* Record # of tapes allocated (for duration of sort) */
-	state->maxTapes = maxTapes;
-	/* Record maximum # of tapes usable as inputs when merging */
-	state->tapeRange = maxTapes - 1;
 }
 
 /*
- * selectnewtape -- select new tape for new initial run.
+ * selectnewtape -- select next tape to output to.
  *
  * This is called after finishing a run when we know another run
- * must be started.  This implements steps D3, D4 of Algorithm D.
+ * must be started.  This is used both when building the initial
+ * runs, and during merge passes.
  */
 static void
 selectnewtape(Tuplesortstate *state)
 {
-	int			j;
-	int			a;
-
-	/* Step D3: advance j (destTape) */
-	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
-	{
-		state->destTape++;
-		return;
-	}
-	if (state->tp_dummy[state->destTape] != 0)
+	if (state->nOutputRuns < state->maxTapes)
 	{
-		state->destTape = 0;
-		return;
+		/* Create a new tape to hold the next run */
+		Assert(state->outputTapes[state->nOutputRuns] == NULL);
+		Assert(state->nOutputRuns == state->nOutputTapes);
+		state->destTape = LogicalTapeCreate(state->tapeset);
+		state->outputTapes[state->nOutputRuns] = state->destTape;
+		state->nOutputTapes++;
+		state->nOutputRuns++;
 	}
-
-	/* Step D4: increase level */
-	state->Level++;
-	a = state->tp_fib[0];
-	for (j = 0; j < state->tapeRange; j++)
+	else
 	{
-		state->tp_dummy[j] = a + state->tp_fib[j + 1] - state->tp_fib[j];
-		state->tp_fib[j] = a + state->tp_fib[j + 1];
+		/*
+		 * We have reached the max number of tapes.  Append to an existing
+		 * tape.
+		 */
+		state->destTape = state->outputTapes[state->nOutputRuns % state->nOutputTapes];
+		state->nOutputRuns++;
 	}
-	state->destTape = 0;
 }
 
 /*
@@ -2784,18 +2811,13 @@ init_slab_allocator(Tuplesortstate *state, int numSlots)
 /*
  * mergeruns -- merge all the completed initial runs.
  *
- * This implements steps D5, D6 of Algorithm D.  All input data has
+ * This implements the Balanced k-Way Merge Algorithm.  All input data has
  * already been written to initial runs on tape (see dumptuples).
  */
 static void
 mergeruns(Tuplesortstate *state)
 {
-	int			tapenum,
-				svTape,
-				svRuns,
-				svDummy;
-	int			numTapes;
-	int			numInputTapes;
+	int			tapenum;
 
 	Assert(state->status == TSS_BUILDRUNS);
 	Assert(state->memtupcount == 0);
@@ -2830,99 +2852,111 @@ mergeruns(Tuplesortstate *state)
 	pfree(state->memtuples);
 	state->memtuples = NULL;
 
-	/*
-	 * If we had fewer runs than tapes, refund the memory that we imagined we
-	 * would need for the tape buffers of the unused tapes.
-	 *
-	 * numTapes and numInputTapes reflect the actual number of tapes we will
-	 * use.  Note that the output tape's tape number is maxTapes - 1, so the
-	 * tape numbers of the used tapes are not consecutive, and you cannot just
-	 * loop from 0 to numTapes to visit all used tapes!
-	 */
-	if (state->Level == 1)
-	{
-		numInputTapes = state->currentRun;
-		numTapes = numInputTapes + 1;
-		FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
-	}
-	else
-	{
-		numInputTapes = state->tapeRange;
-		numTapes = state->maxTapes;
-	}
-
 	/*
 	 * Initialize the slab allocator.  We need one slab slot per input tape,
 	 * for the tuples in the heap, plus one to hold the tuple last returned
 	 * from tuplesort_gettuple.  (If we're sorting pass-by-val Datums,
 	 * however, we don't need to do allocate anything.)
 	 *
+	 * In a multi-pass merge, we could shrink this allocation for the last
+	 * merge pass, if it has fewer tapes than previous passes, but we don't
+	 * bother.
+	 *
 	 * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism
 	 * to track memory usage of individual tuples.
 	 */
 	if (state->tuples)
-		init_slab_allocator(state, numInputTapes + 1);
+		init_slab_allocator(state, state->nOutputTapes + 1);
 	else
 		init_slab_allocator(state, 0);
 
 	/*
 	 * Allocate a new 'memtuples' array, for the heap.  It will hold one tuple
 	 * from each input tape.
+	 *
+	 * We could shrink this, too, between passes in a multi-pass merge, but we
+	 * don't bother.  (The initial input tapes are still in outputTapes.  The
+	 * number of input tapes will not increase between passes.)
 	 */
-	state->memtupsize = numInputTapes;
+	state->memtupsize = state->nOutputTapes;
 	state->memtuples = (SortTuple *) MemoryContextAlloc(state->maincontext,
-														numInputTapes * sizeof(SortTuple));
+														state->nOutputTapes * sizeof(SortTuple));
 	USEMEM(state, GetMemoryChunkSpace(state->memtuples));
 
 	/*
-	 * Use all the remaining memory we have available for read buffers among
-	 * the input tapes.
-	 *
-	 * We don't try to "rebalance" the memory among tapes, when we start a new
-	 * merge phase, even if some tapes are inactive in the new phase.  That
-	 * would be hard, because logtape.c doesn't know where one run ends and
-	 * another begins.  When a new merge phase begins, and a tape doesn't
-	 * participate in it, its buffer nevertheless already contains tuples from
-	 * the next run on same tape, so we cannot release the buffer.  That's OK
-	 * in practice, merge performance isn't that sensitive to the amount of
-	 * buffers used, and most merge phases use all or almost all tapes,
-	 * anyway.
+	 * Use all the remaining memory we have available for tape buffers among
+	 * all the input tapes.  At the beginning of each merge pass, we will
+	 * divide this memory between the input and output tapes in the pass.
 	 */
+	state->tape_buffer_mem = state->availMem;
+	USEMEM(state, state->availMem);
 #ifdef TRACE_SORT
 	if (trace_sort)
-		elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for read buffers among %d input tapes",
-			 state->worker, state->availMem / 1024, numInputTapes);
+		elog(LOG, "worker %d using " INT64_FORMAT " KB of memory for tape buffers",
+			 state->worker, state->tape_buffer_mem / 1024);
 #endif
 
-	state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
-	USEMEM(state, state->read_buffer_size * numInputTapes);
-
-	/* End of step D2: rewind all output tapes to prepare for merging */
-	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-		LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size);
-
 	for (;;)
 	{
 		/*
-		 * At this point we know that tape[T] is empty.  If there's just one
-		 * (real or dummy) run left on each input tape, then only one merge
-		 * pass remains.  If we don't have to produce a materialized sorted
-		 * tape, we can stop at this point and do the final merge on-the-fly.
+		 * On the first iteration, or if we have read all the runs from the
+		 * input tapes in a multi-pass merge, it's time to start a new pass.
+		 * Rewind all the output tapes, and make them inputs for the next
+		 * pass.
 		 */
-		if (!state->randomAccess && !WORKER(state))
+		if (state->nInputRuns == 0 && !WORKER(state))
 		{
-			bool		allOneRun = true;
+			int64		input_buffer_size;
 
-			Assert(state->tp_runs[state->tapeRange] == 0);
-			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
+			/* Close the old, emptied, input tapes */
+			if (state->nInputTapes > 0)
 			{
-				if (state->tp_runs[tapenum] + state->tp_dummy[tapenum] != 1)
-				{
-					allOneRun = false;
-					break;
-				}
+				for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+					LogicalTapeClose(state->inputTapes[tapenum]);
+				pfree(state->inputTapes);
 			}
-			if (allOneRun)
+
+			/* Previous pass's outputs become next pass's inputs. */
+			state->inputTapes = state->outputTapes;
+			state->nInputTapes = state->nOutputTapes;
+			state->nInputRuns = state->nOutputRuns;
+
+			/*
+			 * Reset output tape variables.  The actual LogicalTapes will be
+			 * created as needed, here we only allocate the array to hold
+			 * them.
+			 */
+			state->outputTapes = palloc0(state->nInputTapes * sizeof(LogicalTape *));
+			state->nOutputTapes = 0;
+			state->nOutputRuns = 0;
+
+			/*
+			 * Redistribute the memory allocated for tape buffers, among the
+			 * new input and output tapes.
+			 */
+			input_buffer_size = merge_read_buffer_size(state->tape_buffer_mem,
+													   state->nInputTapes,
+													   state->nInputRuns,
+													   state->maxTapes);
+
+#ifdef TRACE_SORT
+			if (trace_sort)
+				elog(LOG, "starting merge pass of %d input runs on %d tapes, " INT64_FORMAT " KB of memory for each input tape: %s",
+					 state->nInputRuns, state->nInputTapes, input_buffer_size / 1024,
+					 pg_rusage_show(&state->ru_start));
+#endif
+
+			/* Prepare the new input tapes for merge pass. */
+			for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+				LogicalTapeRewindForRead(state->inputTapes[tapenum], input_buffer_size);
+
+			/*
+			 * If there's just one run left on each input tape, then only one
+			 * merge pass remains.  If we don't have to produce a materialized
+			 * sorted tape, we can stop at this point and do the final merge
+			 * on-the-fly.
+			 */
+			if (!state->randomAccess && state->nInputRuns <= state->nInputTapes)
 			{
 				/* Tell logtape.c we won't be writing anymore */
 				LogicalTapeSetForgetFreeSpace(state->tapeset);
@@ -2933,103 +2967,47 @@ mergeruns(Tuplesortstate *state)
 			}
 		}
 
-		/* Step D5: merge runs onto tape[T] until tape[P] is empty */
-		while (state->tp_runs[state->tapeRange - 1] ||
-			   state->tp_dummy[state->tapeRange - 1])
-		{
-			bool		allDummy = true;
-
-			for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-			{
-				if (state->tp_dummy[tapenum] == 0)
-				{
-					allDummy = false;
-					break;
-				}
-			}
-
-			if (allDummy)
-			{
-				state->tp_dummy[state->tapeRange]++;
-				for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-					state->tp_dummy[tapenum]--;
-			}
-			else
-				mergeonerun(state);
-		}
-
-		/* Step D6: decrease level */
-		if (--state->Level == 0)
-			break;
-
-		/* rewind output tape T to use as new input */
-		LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]],
-								 state->read_buffer_size);
+		/* Select an output tape */
+		selectnewtape(state);
 
-		/* close used-up input tape P, and create a new one for write pass */
-		LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]);
-		state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset);
-		state->tp_runs[state->tapeRange - 1] = 0;
+		/* Merge one run from each input tape. */
+		mergeonerun(state);
 
 		/*
-		 * reassign tape units per step D6; note we no longer care about A[]
+		 * If the input tapes are empty, and we output only one output run,
+		 * we're done.  The current output tape contains the final result.
 		 */
-		svTape = state->tp_tapenum[state->tapeRange];
-		svDummy = state->tp_dummy[state->tapeRange];
-		svRuns = state->tp_runs[state->tapeRange];
-		for (tapenum = state->tapeRange; tapenum > 0; tapenum--)
-		{
-			state->tp_tapenum[tapenum] = state->tp_tapenum[tapenum - 1];
-			state->tp_dummy[tapenum] = state->tp_dummy[tapenum - 1];
-			state->tp_runs[tapenum] = state->tp_runs[tapenum - 1];
-		}
-		state->tp_tapenum[0] = svTape;
-		state->tp_dummy[0] = svDummy;
-		state->tp_runs[0] = svRuns;
+		if (state->nInputRuns == 0 && state->nOutputRuns <= 1)
+			break;
 	}
 
 	/*
-	 * Done.  Knuth says that the result is on TAPE[1], but since we exited
-	 * the loop without performing the last iteration of step D6, we have not
-	 * rearranged the tape unit assignment, and therefore the result is on
-	 * TAPE[T].  We need to do it this way so that we can freeze the final
-	 * output tape while rewinding it.  The last iteration of step D6 would be
-	 * a waste of cycles anyway...
+	 * Done.  The result is on a single run on a single tape.
 	 */
-	state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]];
+	state->result_tape = state->outputTapes[0];
 	if (!WORKER(state))
 		LogicalTapeFreeze(state->result_tape, NULL);
 	else
 		worker_freeze_result_tape(state);
 	state->status = TSS_SORTEDONTAPE;
 
-	/* Close all the other tapes, to release their read buffers. */
-	for (tapenum = 0; tapenum < state->maxTapes; tapenum++)
-	{
-		if (state->tapes[tapenum] != state->result_tape)
-		{
-			LogicalTapeClose(state->tapes[tapenum]);
-			state->tapes[tapenum] = NULL;
-		}
-	}
+	/* Close all the now-empty input tapes, to release their read buffers. */
+	for (tapenum = 0; tapenum < state->nInputTapes; tapenum++)
+		LogicalTapeClose(state->inputTapes[tapenum]);
 }
 
 /*
- * Merge one run from each input tape, except ones with dummy runs.
- *
- * This is the inner loop of Algorithm D step D5.  We know that the
- * output tape is TAPE[T].
+ * Merge one run from each input tape.
  */
 static void
 mergeonerun(Tuplesortstate *state)
 {
-	int			destTapeNum = state->tp_tapenum[state->tapeRange];
-	LogicalTape *destTape = state->tapes[destTapeNum];
-	int			srcTape;
+	int			srcTapeIndex;
+	LogicalTape *srcTape;
 
 	/*
 	 * Start the merge by loading one tuple from each active source tape into
-	 * the heap.  We can also decrease the input run/dummy run counts.
+	 * the heap.
 	 */
 	beginmerge(state);
 
@@ -3043,8 +3021,9 @@ mergeonerun(Tuplesortstate *state)
 		SortTuple	stup;
 
 		/* write the tuple to destTape */
-		srcTape = state->memtuples[0].srctape;
-		WRITETUP(state, destTape, &state->memtuples[0]);
+		srcTapeIndex = state->memtuples[0].srctape;
+		srcTape = state->inputTapes[srcTapeIndex];
+		WRITETUP(state, state->destTape, &state->memtuples[0]);
 
 		/* recycle the slot of the tuple we just wrote out, for the next read */
 		if (state->memtuples[0].tuple)
@@ -3056,72 +3035,47 @@ mergeonerun(Tuplesortstate *state)
 		 */
 		if (mergereadnext(state, srcTape, &stup))
 		{
-			stup.srctape = srcTape;
+			stup.srctape = srcTapeIndex;
 			tuplesort_heap_replace_top(state, &stup);
+
 		}
 		else
+		{
 			tuplesort_heap_delete_top(state);
+			state->nInputRuns--;
+		}
 	}
 
 	/*
 	 * When the heap empties, we're done.  Write an end-of-run marker on the
-	 * output tape, and increment its count of real runs.
+	 * output tape.
 	 */
-	markrunend(destTape);
-	state->tp_runs[state->tapeRange]++;
-
-#ifdef TRACE_SORT
-	if (trace_sort)
-		elog(LOG, "worker %d finished %d-way merge step: %s", state->worker,
-			 state->activeTapes, pg_rusage_show(&state->ru_start));
-#endif
+	markrunend(state->destTape);
 }
 
 /*
  * beginmerge - initialize for a merge pass
  *
- * We decrease the counts of real and dummy runs for each tape, and mark
- * which tapes contain active input runs in mergeactive[].  Then, fill the
- * merge heap with the first tuple from each active tape.
+ * Fill the merge heap with the first tuple from each input tape.
  */
 static void
 beginmerge(Tuplesortstate *state)
 {
 	int			activeTapes;
-	int			tapenum;
-	int			srcTape;
+	int			srcTapeIndex;
 
 	/* Heap should be empty here */
 	Assert(state->memtupcount == 0);
 
-	/* Adjust run counts and mark the active tapes */
-	memset(state->mergeactive, 0,
-		   state->maxTapes * sizeof(*state->mergeactive));
-	activeTapes = 0;
-	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-	{
-		if (state->tp_dummy[tapenum] > 0)
-			state->tp_dummy[tapenum]--;
-		else
-		{
-			Assert(state->tp_runs[tapenum] > 0);
-			state->tp_runs[tapenum]--;
-			srcTape = state->tp_tapenum[tapenum];
-			state->mergeactive[srcTape] = true;
-			activeTapes++;
-		}
-	}
-	Assert(activeTapes > 0);
-	state->activeTapes = activeTapes;
+	activeTapes = Min(state->nInputTapes, state->nInputRuns);
 
-	/* Load the merge heap with the first tuple from each input tape */
-	for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+	for (srcTapeIndex = 0; srcTapeIndex < activeTapes; srcTapeIndex++)
 	{
 		SortTuple	tup;
 
-		if (mergereadnext(state, srcTape, &tup))
+		if (mergereadnext(state, state->inputTapes[srcTapeIndex], &tup))
 		{
-			tup.srctape = srcTape;
+			tup.srctape = srcTapeIndex;
 			tuplesort_heap_insert(state, &tup);
 		}
 	}
@@ -3133,20 +3087,13 @@ beginmerge(Tuplesortstate *state)
  * Returns false on EOF.
  */
 static bool
-mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
+mergereadnext(Tuplesortstate *state, LogicalTape *srcTape, SortTuple *stup)
 {
-	LogicalTape *srcTape = state->tapes[srcTapeIndex];
 	unsigned int tuplen;
 
-	if (!state->mergeactive[srcTapeIndex])
-		return false;			/* tape's run is already exhausted */
-
 	/* read next tuple, if any */
 	if ((tuplen = getlen(srcTape, true)) == 0)
-	{
-		state->mergeactive[srcTapeIndex] = false;
 		return false;
-	}
 	READTUP(state, stup, srcTape, tuplen);
 
 	return true;
@@ -3161,7 +3108,6 @@ mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup)
 static void
 dumptuples(Tuplesortstate *state, bool alltuples)
 {
-	LogicalTape *destTape;
 	int			memtupwrite;
 	int			i;
 
@@ -3177,22 +3123,13 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 	 * Final call might require no sorting, in rare cases where we just so
 	 * happen to have previously LACKMEM()'d at the point where exactly all
 	 * remaining tuples are loaded into memory, just before input was
-	 * exhausted.
-	 *
-	 * In general, short final runs are quite possible.  Rather than allowing
-	 * a special case where there was a superfluous selectnewtape() call (i.e.
-	 * a call with no subsequent run actually written to destTape), we prefer
-	 * to write out a 0 tuple run.
-	 *
-	 * mergereadnext() is prepared for 0 tuple runs, and will reliably mark
-	 * the tape inactive for the merge when called from beginmerge().  This
-	 * case is therefore similar to the case where mergeonerun() finds a dummy
-	 * run for the tape, and so doesn't need to merge a run from the tape (or
-	 * conceptually "merges" the dummy run, if you prefer).  According to
-	 * Knuth, Algorithm D "isn't strictly optimal" in its method of
-	 * distribution and dummy run assignment; this edge case seems very
-	 * unlikely to make that appreciably worse.
+	 * exhausted.  In general, short final runs are quite possible, but avoid
+	 * creating a completely empty run.  In a worker, though, we must produce
+	 * at least one tape, even if it's empty.
 	 */
+	if (state->memtupcount == 0 && state->currentRun > 0)
+		return;
+
 	Assert(state->status == TSS_BUILDRUNS);
 
 	/*
@@ -3205,6 +3142,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 				 errmsg("cannot have more than %d runs for an external sort",
 						INT_MAX)));
 
+	if (state->currentRun > 0)
+		selectnewtape(state);
+
 	state->currentRun++;
 
 #ifdef TRACE_SORT
@@ -3228,10 +3168,9 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 #endif
 
 	memtupwrite = state->memtupcount;
-	destTape = state->tapes[state->tp_tapenum[state->destTape]];
 	for (i = 0; i < memtupwrite; i++)
 	{
-		WRITETUP(state, destTape, &state->memtuples[i]);
+		WRITETUP(state, state->destTape, &state->memtuples[i]);
 		state->memtupcount--;
 	}
 
@@ -3244,19 +3183,14 @@ dumptuples(Tuplesortstate *state, bool alltuples)
 	 */
 	MemoryContextReset(state->tuplecontext);
 
-	markrunend(destTape);
-	state->tp_runs[state->destTape]++;
-	state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
+	markrunend(state->destTape);
 
 #ifdef TRACE_SORT
 	if (trace_sort)
 		elog(LOG, "worker %d finished writing run %d to tape %d: %s",
-			 state->worker, state->currentRun, state->destTape,
+			 state->worker, state->currentRun, (state->currentRun - 1) % state->nOutputTapes + 1,
 			 pg_rusage_show(&state->ru_start));
 #endif
-
-	if (!alltuples)
-		selectnewtape(state);
 }
 
 /*
@@ -4650,8 +4584,9 @@ worker_nomergeruns(Tuplesortstate *state)
 {
 	Assert(WORKER(state));
 	Assert(state->result_tape == NULL);
+	Assert(state->nOutputRuns == 1);
 
-	state->result_tape = state->tapes[state->tp_tapenum[state->destTape]];
+	state->result_tape = state->destTape;
 	worker_freeze_result_tape(state);
 }
 
@@ -4688,47 +4623,36 @@ leader_takeover_tapes(Tuplesortstate *state)
 	 * Create the tapeset from worker tapes, including a leader-owned tape at
 	 * the end.  Parallel workers are far more expensive than logical tapes,
 	 * so the number of tapes allocated here should never be excessive.
-	 *
-	 * We still have a leader tape, though it's not possible to write to it
-	 * due to restrictions in the shared fileset infrastructure used by
-	 * logtape.c.  It will never be written to in practice because
-	 * randomAccess is disallowed for parallel sorts.
 	 */
-	inittapestate(state, nParticipants + 1);
-	state->tapeset = LogicalTapeSetCreate(false,
-										  &shared->fileset,
-										  state->worker);
-	state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *));
-	for (j = 0; j < nParticipants; j++)
-		state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
-	/* tapes[nParticipants] represents the "leader tape", which is not used */
+	inittapestate(state, nParticipants);
+	state->tapeset = LogicalTapeSetCreate(false, &shared->fileset, -1);
 
-	/* mergeruns() relies on currentRun for # of runs (in one-pass cases) */
+	/*
+	 * Set currentRun to reflect the number of runs we will merge (it's not
+	 * used for anything, this is just pro forma)
+	 */
 	state->currentRun = nParticipants;
 
 	/*
-	 * Initialize variables of Algorithm D to be consistent with runs from
-	 * workers having been generated in the leader.
+	 * Initialize the state to look the same as after building the initial
+	 * runs.
 	 *
 	 * There will always be exactly 1 run per worker, and exactly one input
 	 * tape per run, because workers always output exactly 1 run, even when
 	 * there were no input tuples for workers to sort.
 	 */
-	for (j = 0; j < state->maxTapes; j++)
+	state->inputTapes = NULL;
+	state->nInputTapes = 0;
+	state->nInputRuns = 0;
+
+	state->outputTapes = palloc0(nParticipants * sizeof(LogicalTape *));
+	state->nOutputTapes = nParticipants;
+	state->nOutputRuns = nParticipants;
+
+	for (j = 0; j < nParticipants; j++)
 	{
-		/* One real run; no dummy runs for worker tapes */
-		state->tp_fib[j] = 1;
-		state->tp_runs[j] = 1;
-		state->tp_dummy[j] = 0;
-		state->tp_tapenum[j] = j;
+		state->outputTapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]);
 	}
-	/* Leader tape gets one dummy run, and no real runs */
-	state->tp_fib[state->tapeRange] = 0;
-	state->tp_runs[state->tapeRange] = 0;
-	state->tp_dummy[state->tapeRange] = 1;
-
-	state->Level = 1;
-	state->destTape = 0;
 
 	state->status = TSS_BUILDRUNS;
 }
-- 
2.29.2

>From f38f4ba78bb2559e3e8089784fa034f3fcf67f1b Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Fri, 22 Jan 2021 22:59:29 +0200
Subject: [PATCH v3 3/3] Fix sizing of tape read buffers.

---
 src/backend/utils/sort/tuplesort.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 14104643e78..aff2f201f52 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2658,7 +2658,7 @@ merge_read_buffer_size(int64 avail_mem, int nInputTapes, int nInputRuns,
 	 * we derive the input buffer size from the amount of memory available,
 	 * and M and N.
 	 */
-	return (avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes;
+	return Max((avail_mem - TAPE_BUFFER_OVERHEAD * nOutputTapes) / nInputTapes, 0);
 }
 
 /*
-- 
2.29.2

Reply via email to