On 08/02/2016 01:18 AM, Peter Geoghegan wrote:
Tape unification
----------------

Sort operations have a unique identifier, generated before any workers
are launched, using a scheme based on the leader's PID, and a unique
temp file number. This makes all on-disk state (temp files managed by
logtape.c) discoverable by the leader process. State in shared memory
is sized in proportion to the number of workers, so the only thing
about the data being sorted that gets passed around in shared memory
is a little logtape.c metadata for tapes, describing for example how
large each constituent BufFile is (a BufFile associated with one
particular worker's tapeset).

(See below also for notes on buffile.c's role in all of this, fd.c and
resource management, etc.)

> ...

buffile.c, and "unification"
============================

There has been significant new infrastructure added to make logtape.c
aware of workers. buffile.c has in turn been taught about unification
as a first class part of the abstraction, with low-level management of
certain details occurring within fd.c. So, "tape unification" within
processes to open other backend's logical tapes to generate a unified
logical tapeset for the leader to merge is added. This is probably the
single biggest source of complexity for the patch, since I must
consider:

* Creating a general, reusable abstraction for other possible BufFile
users (logtape.c only has to serve tuplesort.c, though).

* Logical tape free space management.

* Resource management, file lifetime, etc. fd.c resource management
can now close a file at xact end for temp files, while not deleting it
in the leader backend (only the "owning" worker backend deletes the
temp file it owns).

* Crash safety (e.g., when to truncate existing temp files, and when not to).

I find this unification business really complicated. I think it'd be simpler to keep the BufFiles and LogicalTapeSets separate, and instead teach tuplesort.c how to merge tapes that live on different LogicalTapeSets/BufFiles. Or refactor LogicalTapeSet so that a single LogicalTapeSet can contain tapes from different underlying BufFiles.

What I have in mind is something like the attached patch. It refactors LogicalTapeRead(), LogicalTapeWrite() etc. functions to take a LogicalTape as argument, instead of LogicalTapeSet and tape number. LogicalTapeSet doesn't have the concept of a tape number anymore, it can contain any number of tapes, and you can create more on the fly. With that, it'd be fairly easy to make tuplesort.c merge LogicalTapes that came from different tape sets, backed by different BufFiles. I think that'd avoid much of the unification code.

That leaves one problem, though: reusing space in the final merge phase. If the tapes being merged belong to different LogicalTapeSets, and create one new tape to hold the result, the new tape cannot easily reuse the space of the input tapes because they are on different tape sets. But looking at your patch, ISTM you actually dodged that problem as well:

+        * As a consequence of only being permitted to write to the leader
+        * controlled range, parallel sorts that require a final materialized 
tape
+        * will use approximately twice the disk space for temp files compared 
to
+        * a more or less equivalent serial sort.  This is deemed acceptable,
+        * since it is far rarer in practice for parallel sort operations to
+        * require a final materialized output tape.  Note that this does not
+        * apply to any merge process required by workers, which may reuse space
+        * eagerly, just like conventional serial external sorts, and so
+        * typically, parallel sorts consume approximately the same amount of 
disk
+        * blocks as a more or less equivalent serial sort, even when workers 
must
+        * perform some merging to produce input to the leader.

I'm slightly worried about that. Maybe it's OK for a first version, but it'd be annoying in a query where a sort is below a merge join, for example, so that you can't do the final merge on the fly because mark/restore support is needed.

One way to fix that would be have all the parallel works share the work files to begin with, and keep the "nFileBlocks" value in shared memory so that the workers won't overlap each other. Then all the blocks from different workers would be mixed together, though, which would hurt the sequential pattern of the tapes, so each workers would need to allocate larger chunks to avoid that.

- Heikki

>From a1aa45c22cd13a2059880154e30f48d884a849ef Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakan...@iki.fi>
Date: Wed, 21 Sep 2016 19:31:33 +0300
Subject: [PATCH 1/1] Refactor LogicalTapeSet/LogicalTape interface.

A LogicalTape is now visible to callers, not just an internal object
within logtape.c. All the tape functions, like LogicalTapeRead and
LogicalTapeWrite, take a LogicalTape as argument, instead of
LogicalTapeSet+tape number.

You can create any number of LogicalTapes in a single LogicalTapeSet, and
you don't need to decide the number upfront, when you create the tape set
(although this patch doesn't take any advantage of that yet).
---
 src/backend/utils/sort/logtape.c   | 213 +++++++++++++------------------------
 src/backend/utils/sort/tuplesort.c |  94 ++++++++--------
 src/include/utils/logtape.h        |  67 +++++++++---
 3 files changed, 170 insertions(+), 204 deletions(-)

diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7745207..6fe66a1 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -103,40 +103,6 @@ typedef struct IndirectBlock
 } IndirectBlock;
 
 /*
- * This data structure represents a single "logical tape" within the set
- * of logical tapes stored in the same file.  We must keep track of the
- * current partially-read-or-written data block as well as the active
- * indirect block level(s).
- */
-typedef struct LogicalTape
-{
-	IndirectBlock *indirect;	/* bottom of my indirect-block hierarchy */
-	bool		writing;		/* T while in write phase */
-	bool		frozen;			/* T if blocks should not be freed when read */
-	bool		dirty;			/* does buffer need to be written? */
-
-	/*
-	 * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
-	 * lastBlockBytes.  BUT: we do not update lastBlockBytes during writing,
-	 * only at completion of a write phase.
-	 */
-	long		numFullBlocks;	/* number of complete blocks in log tape */
-	int			lastBlockBytes; /* valid bytes in last (incomplete) block */
-
-	/*
-	 * Buffer for current data block.  Note we don't bother to store the
-	 * actual file block number of the data block (during the write phase it
-	 * hasn't been assigned yet, and during read we don't care anymore). But
-	 * we do need the relative block number so we can detect end-of-tape while
-	 * reading.
-	 */
-	char	   *buffer;			/* physical buffer (separately palloc'd) */
-	long		curBlockNumber; /* this block's logical blk# within tape */
-	int			pos;			/* next read/write position in buffer */
-	int			nbytes;			/* total # of valid bytes in buffer */
-} LogicalTape;
-
-/*
  * This data structure represents a set of related "logical tapes" sharing
  * space in a single underlying file.  (But that "file" may be multiple files
  * if needed to escape OS limits on file size; buffile.c handles that for us.)
@@ -165,10 +131,6 @@ struct LogicalTapeSet
 	long	   *freeBlocks;		/* resizable array */
 	int			nFreeBlocks;	/* # of currently free blocks */
 	int			freeBlocksLen;	/* current allocated length of freeBlocks[] */
-
-	/* The array of logical tapes. */
-	int			nTapes;			/* # of logical tapes in set */
-	LogicalTape tapes[FLEXIBLE_ARRAY_MEMBER];	/* has nTapes nentries */
 };
 
 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
@@ -187,7 +149,7 @@ static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
 					  bool frozen);
 static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
 					  IndirectBlock *indirect);
-static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+static void ltsDumpBuffer(LogicalTape *lt);
 
 
 /*
@@ -509,18 +471,14 @@ ltsRecallPrevBlockNum(LogicalTapeSet *lts,
  * Each tape is initialized in write state.
  */
 LogicalTapeSet *
-LogicalTapeSetCreate(int ntapes)
+LogicalTapeSetCreate(void)
 {
 	LogicalTapeSet *lts;
-	LogicalTape *lt;
-	int			i;
 
 	/*
 	 * Create top-level struct including per-tape LogicalTape structs.
 	 */
-	Assert(ntapes > 0);
-	lts = (LogicalTapeSet *) palloc(offsetof(LogicalTapeSet, tapes) +
-									ntapes * sizeof(LogicalTape));
+	lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet));
 	lts->pfile = BufFileCreateTemp(false);
 	lts->nFileBlocks = 0L;
 	lts->forgetFreeSpace = false;
@@ -528,7 +486,34 @@ LogicalTapeSetCreate(int ntapes)
 	lts->freeBlocksLen = 32;	/* reasonable initial guess */
 	lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
 	lts->nFreeBlocks = 0;
-	lts->nTapes = ntapes;
+
+	return lts;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ *
+ * NOTE: This doesn't close any open tapes.
+ */
+void
+LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+	BufFileClose(lts->pfile);
+	pfree(lts->freeBlocks);
+	pfree(lts);
+}
+
+/*
+ * Create a bunch of tapes in the given tapeset.  Returns an
+ * array of LogicalTapes, with 'ntapes' entries.
+ */
+LogicalTape *
+LogicalTapeCreate(LogicalTapeSet *lts, int ntapes)
+{
+	LogicalTape *tapes;
+	int			i;
+
+	tapes = palloc(ntapes * sizeof(LogicalTape));
 
 	/*
 	 * Initialize per-tape structs.  Note we allocate the I/O buffer and
@@ -538,7 +523,9 @@ LogicalTapeSetCreate(int ntapes)
 	 */
 	for (i = 0; i < ntapes; i++)
 	{
-		lt = &lts->tapes[i];
+		LogicalTape *lt = &tapes[i];
+
+		lt->tapeSet = lts;
 		lt->indirect = NULL;
 		lt->writing = true;
 		lt->frozen = false;
@@ -550,34 +537,8 @@ LogicalTapeSetCreate(int ntapes)
 		lt->pos = 0;
 		lt->nbytes = 0;
 	}
-	return lts;
-}
 
-/*
- * Close a logical tape set and release all resources.
- */
-void
-LogicalTapeSetClose(LogicalTapeSet *lts)
-{
-	LogicalTape *lt;
-	IndirectBlock *ib,
-			   *nextib;
-	int			i;
-
-	BufFileClose(lts->pfile);
-	for (i = 0; i < lts->nTapes; i++)
-	{
-		lt = &lts->tapes[i];
-		for (ib = lt->indirect; ib != NULL; ib = nextib)
-		{
-			nextib = ib->nextup;
-			pfree(ib);
-		}
-		if (lt->buffer)
-			pfree(lt->buffer);
-	}
-	pfree(lts->freeBlocks);
-	pfree(lts);
+	return tapes;
 }
 
 /*
@@ -596,16 +557,25 @@ LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts)
 }
 
 /*
+ * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
+ */
+long
+LogicalTapeSetBlocks(LogicalTapeSet *lts)
+{
+	return lts->nFileBlocks;
+}
+
+/*
  * Dump the dirty buffer of a logical tape.
  */
 static void
-ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+ltsDumpBuffer(LogicalTape *lt)
 {
-	long		datablock = ltsGetFreeBlock(lts);
+	long		datablock = ltsGetFreeBlock(lt->tapeSet);
 
 	Assert(lt->dirty);
-	ltsWriteBlock(lts, datablock, (void *) lt->buffer);
-	ltsRecordBlockNum(lts, lt->indirect, datablock);
+	ltsWriteBlock(lt->tapeSet, datablock, (void *) lt->buffer);
+	ltsRecordBlockNum(lt->tapeSet, lt->indirect, datablock);
 	lt->dirty = false;
 	/* Caller must do other state update as needed */
 }
@@ -616,14 +586,10 @@ ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
  * There are no error returns; we ereport() on failure.
  */
 void
-LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-				 void *ptr, size_t size)
+LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 
 	/* Allocate data buffer and first indirect block on first write */
@@ -642,7 +608,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 		{
 			/* Buffer full, dump it out */
 			if (lt->dirty)
-				ltsDumpBuffer(lts, lt);
+				ltsDumpBuffer(lt);
 			else
 			{
 				/* Hmm, went directly from reading to writing? */
@@ -677,14 +643,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
  * opposite of the previous tape state.
  */
 void
-LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
+LogicalTapeRewind(LogicalTape *lt, bool forWrite)
 {
-	LogicalTape *lt;
 	long		datablocknum;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
-
 	if (!forWrite)
 	{
 		if (lt->writing)
@@ -695,10 +657,10 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 			 * (destructive) read.
 			 */
 			if (lt->dirty)
-				ltsDumpBuffer(lts, lt);
+				ltsDumpBuffer(lt);
 			lt->lastBlockBytes = lt->nbytes;
 			lt->writing = false;
-			datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
+			datablocknum = ltsRewindIndirectBlock(lt->tapeSet, lt->indirect, false);
 		}
 		else
 		{
@@ -707,7 +669,7 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 			 * pass.
 			 */
 			Assert(lt->frozen);
-			datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
+			datablocknum = ltsRewindFrozenIndirectBlock(lt->tapeSet, lt->indirect);
 		}
 		/* Read the first block, or reset if tape is empty */
 		lt->curBlockNumber = 0L;
@@ -715,9 +677,9 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 		lt->nbytes = 0;
 		if (datablocknum != -1L)
 		{
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+			ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
+				ltsReleaseBlock(lt->tapeSet, datablocknum);
 			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 				BLCKSZ : lt->lastBlockBytes;
 		}
@@ -763,15 +725,11 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
  * Early EOF is indicated by return value less than #bytes requested.
  */
 size_t
-LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-				void *ptr, size_t size)
+LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size)
 {
-	LogicalTape *lt;
 	size_t		nread = 0;
 	size_t		nthistime;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(!lt->writing);
 
 	while (size > 0)
@@ -779,16 +737,16 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 		if (lt->pos >= lt->nbytes)
 		{
 			/* Try to load more data into buffer. */
-			long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+			long		datablocknum = ltsRecallNextBlockNum(lt->tapeSet, lt->indirect,
 															 lt->frozen);
 
 			if (datablocknum == -1L)
 				break;			/* EOF */
 			lt->curBlockNumber++;
 			lt->pos = 0;
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+			ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 			if (!lt->frozen)
-				ltsReleaseBlock(lts, datablocknum);
+				ltsReleaseBlock(lt->tapeSet, datablocknum);
 			lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 				BLCKSZ : lt->lastBlockBytes;
 			if (lt->nbytes <= 0)
@@ -823,13 +781,10 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
  * for-read call is OK but not necessary.
  */
 void
-LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+LogicalTapeFreeze(LogicalTape *lt)
 {
-	LogicalTape *lt;
 	long		datablocknum;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->writing);
 
 	/*
@@ -837,18 +792,18 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 	 * partial indirect blocks, rewind for nondestructive read.
 	 */
 	if (lt->dirty)
-		ltsDumpBuffer(lts, lt);
+		ltsDumpBuffer(lt);
 	lt->lastBlockBytes = lt->nbytes;
 	lt->writing = false;
 	lt->frozen = true;
-	datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+	datablocknum = ltsRewindIndirectBlock(lt->tapeSet, lt->indirect, true);
 	/* Read the first block, or reset if tape is empty */
 	lt->curBlockNumber = 0L;
 	lt->pos = 0;
 	lt->nbytes = 0;
 	if (datablocknum != -1L)
 	{
-		ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+		ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 		lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 			BLCKSZ : lt->lastBlockBytes;
 	}
@@ -866,14 +821,11 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
  * data before the current point (in which case there's no state change).
  */
 bool
-LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+LogicalTapeBackspace(LogicalTape *lt, size_t size)
 {
-	LogicalTape *lt;
 	long		nblocks;
 	int			newpos;
 
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 
 	/*
@@ -908,14 +860,14 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
 	 */
 	while (nblocks-- > 0)
 	{
-		long		datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+		long		datablocknum = ltsRecallPrevBlockNum(lt->tapeSet, lt->indirect);
 
 		if (datablocknum == -1L)
 			elog(ERROR, "unexpected end of tape");
 		lt->curBlockNumber--;
 		if (nblocks == 0)
 		{
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+			ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 			lt->nbytes = BLCKSZ;
 		}
 	}
@@ -932,13 +884,8 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
  * data in the tape (in which case there's no state change).
  */
 bool
-LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-				long blocknum, int offset)
+LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	Assert(lt->frozen);
 	Assert(offset >= 0 && offset <= BLCKSZ);
 
@@ -965,22 +912,22 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
 	 */
 	while (lt->curBlockNumber > blocknum)
 	{
-		long		datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+		long		datablocknum = ltsRecallPrevBlockNum(lt->tapeSet, lt->indirect);
 
 		if (datablocknum == -1L)
 			elog(ERROR, "unexpected end of tape");
 		if (--lt->curBlockNumber == blocknum)
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+			ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 	}
 	while (lt->curBlockNumber < blocknum)
 	{
-		long		datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+		long		datablocknum = ltsRecallNextBlockNum(lt->tapeSet, lt->indirect,
 														 lt->frozen);
 
 		if (datablocknum == -1L)
 			elog(ERROR, "unexpected end of tape");
 		if (++lt->curBlockNumber == blocknum)
-			ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+			ltsReadBlock(lt->tapeSet, datablocknum, (void *) lt->buffer);
 	}
 	lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
 		BLCKSZ : lt->lastBlockBytes;
@@ -995,22 +942,8 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
  * the position for a seek after freezing.  Not clear if anyone needs that.
  */
 void
-LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-				long *blocknum, int *offset)
+LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset)
 {
-	LogicalTape *lt;
-
-	Assert(tapenum >= 0 && tapenum < lts->nTapes);
-	lt = &lts->tapes[tapenum];
 	*blocknum = lt->curBlockNumber;
 	*offset = lt->pos;
 }
-
-/*
- * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
- */
-long
-LogicalTapeSetBlocks(LogicalTapeSet *lts)
-{
-	return lts->nFileBlocks;
-}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 16ceb30..379ab12 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -265,6 +265,7 @@ struct Tuplesortstate
 	MemoryContext sortcontext;	/* memory context holding most sort data */
 	MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */
 	LogicalTapeSet *tapeset;	/* logtape.c object for tapes in a temp file */
+	LogicalTape *tapes;			/* logtape.c object for tapes in a temp file */
 
 	/*
 	 * These function pointers decouple the routines that must know what kind
@@ -538,9 +539,9 @@ struct Tuplesortstate
  */
 
 /* When using this macro, beware of double evaluation of len */
-#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \
+#define LogicalTapeReadExact(tape, ptr, len) \
 	do { \
-		if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \
+		if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \
 			elog(ERROR, "unexpected end of data"); \
 	} while(0)
 
@@ -694,6 +695,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
 	state->sortcontext = sortcontext;
 	state->tuplecontext = tuplecontext;
 	state->tapeset = NULL;
+	state->tapes = NULL;
 
 	state->memtupcount = 0;
 
@@ -1914,8 +1916,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * end of file; back up to fetch last tuple's ending length
 				 * word.  If seek fails we must have a completely empty file.
 				 */
-				if (!LogicalTapeBackspace(state->tapeset,
-										  state->result_tape,
+				if (!LogicalTapeBackspace(&state->tapes[state->result_tape],
 										  2 * sizeof(unsigned int)))
 					return false;
 				state->eof_reached = false;
@@ -1926,8 +1927,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				 * Back up and fetch previously-returned tuple's ending length
 				 * word.  If seek fails, assume we are at start of file.
 				 */
-				if (!LogicalTapeBackspace(state->tapeset,
-										  state->result_tape,
+				if (!LogicalTapeBackspace(&state->tapes[state->result_tape],
 										  sizeof(unsigned int)))
 					return false;
 				tuplen = getlen(state, state->result_tape, false);
@@ -1935,8 +1935,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 				/*
 				 * Back up to get ending length word of tuple before it.
 				 */
-				if (!LogicalTapeBackspace(state->tapeset,
-										  state->result_tape,
+				if (!LogicalTapeBackspace(&state->tapes[state->result_tape],
 										  tuplen + 2 * sizeof(unsigned int)))
 				{
 					/*
@@ -1945,8 +1944,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 					 * in forward direction (not obviously right, but that is
 					 * what in-memory case does).
 					 */
-					if (!LogicalTapeBackspace(state->tapeset,
-											  state->result_tape,
+					if (!LogicalTapeBackspace(&state->tapes[state->result_tape],
 											  tuplen + sizeof(unsigned int)))
 						elog(ERROR, "bogus tuple length in backward scan");
 					return false;
@@ -1960,8 +1958,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
 			 * Note: READTUP expects we are positioned after the initial
 			 * length word of the tuple, so back up to that point.
 			 */
-			if (!LogicalTapeBackspace(state->tapeset,
-									  state->result_tape,
+			if (!LogicalTapeBackspace(&state->tapes[state->result_tape],
 									  tuplen))
 				elog(ERROR, "bogus tuple length in backward scan");
 			READTUP(state, stup, state->result_tape, tuplen);
@@ -2356,7 +2353,8 @@ inittapes(Tuplesortstate *state)
 	/*
 	 * Create the tape set and allocate the per-tape data arrays.
 	 */
-	state->tapeset = LogicalTapeSetCreate(maxTapes);
+	state->tapeset = LogicalTapeSetCreate();
+	state->tapes = LogicalTapeCreate(state->tapeset, maxTapes);
 
 	state->mergeactive = (bool *) palloc0(maxTapes * sizeof(bool));
 	state->mergenext = (int *) palloc0(maxTapes * sizeof(int));
@@ -2509,14 +2507,14 @@ mergeruns(Tuplesortstate *state)
 	{
 		state->result_tape = state->tp_tapenum[state->destTape];
 		/* must freeze and rewind the finished output tape */
-		LogicalTapeFreeze(state->tapeset, state->result_tape);
+		LogicalTapeFreeze(&state->tapes[state->result_tape]);
 		state->status = TSS_SORTEDONTAPE;
 		return;
 	}
 
 	/* End of step D2: rewind all output tapes to prepare for merging */
 	for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
-		LogicalTapeRewind(state->tapeset, tapenum, false);
+		LogicalTapeRewind(&state->tapes[tapenum], false);
 
 	for (;;)
 	{
@@ -2579,10 +2577,10 @@ mergeruns(Tuplesortstate *state)
 		if (--state->Level == 0)
 			break;
 		/* rewind output tape T to use as new input */
-		LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
+		LogicalTapeRewind(&state->tapes[state->tp_tapenum[state->tapeRange]],
 						  false);
 		/* rewind used-up input tape P, and prepare it for write pass */
-		LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
+		LogicalTapeRewind(&state->tapes[state->tp_tapenum[state->tapeRange - 1]],
 						  true);
 		state->tp_runs[state->tapeRange - 1] = 0;
 
@@ -2612,7 +2610,7 @@ mergeruns(Tuplesortstate *state)
 	 * a waste of cycles anyway...
 	 */
 	state->result_tape = state->tp_tapenum[state->tapeRange];
-	LogicalTapeFreeze(state->tapeset, state->result_tape);
+	LogicalTapeFreeze(&state->tapes[state->result_tape]);
 	state->status = TSS_SORTEDONTAPE;
 }
 
@@ -3481,9 +3479,7 @@ tuplesort_rescan(Tuplesortstate *state)
 			state->markpos_eof = false;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeRewind(state->tapeset,
-							  state->result_tape,
-							  false);
+			LogicalTapeRewind(&state->tapes[state->result_tape], false);
 			state->eof_reached = false;
 			state->markpos_block = 0L;
 			state->markpos_offset = 0;
@@ -3514,8 +3510,7 @@ tuplesort_markpos(Tuplesortstate *state)
 			state->markpos_eof = state->eof_reached;
 			break;
 		case TSS_SORTEDONTAPE:
-			LogicalTapeTell(state->tapeset,
-							state->result_tape,
+			LogicalTapeTell(&state->tapes[state->result_tape],
 							&state->markpos_block,
 							&state->markpos_offset);
 			state->markpos_eof = state->eof_reached;
@@ -3546,8 +3541,7 @@ tuplesort_restorepos(Tuplesortstate *state)
 			state->eof_reached = state->markpos_eof;
 			break;
 		case TSS_SORTEDONTAPE:
-			if (!LogicalTapeSeek(state->tapeset,
-								 state->result_tape,
+			if (!LogicalTapeSeek(&state->tapes[state->result_tape],
 								 state->markpos_block,
 								 state->markpos_offset))
 				elog(ERROR, "tuplesort_restorepos failed");
@@ -3884,7 +3878,7 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK)
 {
 	unsigned int len;
 
-	if (LogicalTapeRead(state->tapeset, tapenum,
+	if (LogicalTapeRead(&state->tapes[tapenum],
 						&len, sizeof(len)) != sizeof(len))
 		elog(ERROR, "unexpected end of tape");
 	if (len == 0 && !eofOK)
@@ -3897,7 +3891,7 @@ markrunend(Tuplesortstate *state, int tapenum)
 {
 	unsigned int len = 0;
 
-	LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
+	LogicalTapeWrite(&state->tapes[tapenum], (void *) &len, sizeof(len));
 }
 
 /*
@@ -4093,12 +4087,12 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup)
 	/* total on-disk footprint: */
 	unsigned int tuplen = tupbodylen + sizeof(int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 (void *) tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
+		LogicalTapeWrite(&state->tapes[tapenum],
 						 (void *) &tuplen, sizeof(tuplen));
 
 	FREEMEM(state, GetMemoryChunkSpace(tuple));
@@ -4117,10 +4111,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
 
 	/* read in the tuple proper */
 	tuple->t_len = tuplen;
-	LogicalTapeReadExact(state->tapeset, tapenum,
+	LogicalTapeReadExact(&state->tapes[tapenum],
 						 tupbody, tupbodylen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
@@ -4334,14 +4328,14 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup)
 	unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int);
 
 	/* We need to store t_self, but not other fields of HeapTupleData */
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 &tuple->t_self, sizeof(ItemPointerData));
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
+		LogicalTapeWrite(&state->tapes[tapenum],
 						 &tuplen, sizeof(tuplen));
 
 	FREEMEM(state, GetMemoryChunkSpace(tuple));
@@ -4360,15 +4354,15 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
 	/* Reconstruct the HeapTupleData header */
 	tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
 	tuple->t_len = t_len;
-	LogicalTapeReadExact(state->tapeset, tapenum,
+	LogicalTapeReadExact(&state->tapes[tapenum],
 						 &tuple->t_self, sizeof(ItemPointerData));
 	/* We don't currently bother to reconstruct t_tableOid */
 	tuple->t_tableOid = InvalidOid;
 	/* Read in the tuple body */
-	LogicalTapeReadExact(state->tapeset, tapenum,
+	LogicalTapeReadExact(&state->tapes[tapenum],
 						 tuple->t_data, tuple->t_len);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value, if it's a simple column */
@@ -4651,12 +4645,12 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup)
 	unsigned int tuplen;
 
 	tuplen = IndexTupleSize(tuple) + sizeof(tuplen);
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 (void *) &tuplen, sizeof(tuplen));
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 (void *) tuple, IndexTupleSize(tuple));
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
+		LogicalTapeWrite(&state->tapes[tapenum],
 						 (void *) &tuplen, sizeof(tuplen));
 
 	FREEMEM(state, GetMemoryChunkSpace(tuple));
@@ -4670,10 +4664,10 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
 	unsigned int tuplen = len - sizeof(unsigned int);
 	IndexTuple	tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
 
-	LogicalTapeReadExact(state->tapeset, tapenum,
+	LogicalTapeReadExact(&state->tapes[tapenum],
 						 tuple, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 &tuplen, sizeof(tuplen));
 	stup->tuple = (void *) tuple;
 	/* set up first-column key value */
@@ -4747,12 +4741,12 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
 
 	writtenlen = tuplen + sizeof(unsigned int);
 
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 (void *) &writtenlen, sizeof(writtenlen));
-	LogicalTapeWrite(state->tapeset, tapenum,
+	LogicalTapeWrite(&state->tapes[tapenum],
 					 waddr, tuplen);
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeWrite(state->tapeset, tapenum,
+		LogicalTapeWrite(&state->tapes[tapenum],
 						 (void *) &writtenlen, sizeof(writtenlen));
 
 	if (stup->tuple)
@@ -4778,7 +4772,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	else if (!state->tuples)
 	{
 		Assert(tuplen == sizeof(Datum));
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 &stup->datum1, tuplen);
 		stup->isnull1 = false;
 		stup->tuple = NULL;
@@ -4787,7 +4781,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	{
 		void	   *raddr = readtup_alloc(state, tapenum, tuplen);
 
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 raddr, tuplen);
 		stup->datum1 = PointerGetDatum(raddr);
 		stup->isnull1 = false;
@@ -4795,7 +4789,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
 	}
 
 	if (state->randomAccess)	/* need trailing length word? */
-		LogicalTapeReadExact(state->tapeset, tapenum,
+		LogicalTapeReadExact(&state->tapes[tapenum],
 							 &tuplen, sizeof(tuplen));
 }
 
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index fa1e992..5f26089 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -16,29 +16,68 @@
 #ifndef LOGTAPE_H
 #define LOGTAPE_H
 
-/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
+/* LogicalTape is an opaque type whose details are not known outside logtape.c. */
 
 typedef struct LogicalTapeSet LogicalTapeSet;
 
+typedef struct IndirectBlock IndirectBlock;
+
+/*
+ * This data structure represents a single "logical tape" within the set
+ * of logical tapes stored in the same file.  We must keep track of the
+ * current partially-read-or-written data block as well as the active
+ * indirect block level(s).
+ *
+ * This should be treated as an opaque struct outside logtape.c. It's defined
+ * here so that they can conveniently be held in an array.
+ */
+typedef struct LogicalTape
+{
+	LogicalTapeSet *tapeSet;	/* tape set this tape is part of */
+
+	IndirectBlock *indirect;	/* bottom of my indirect-block hierarchy */
+	bool		writing;		/* T while in write phase */
+	bool		frozen;			/* T if blocks should not be freed when read */
+	bool		dirty;			/* does buffer need to be written? */
+
+	/*
+	 * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
+	 * lastBlockBytes.  BUT: we do not update lastBlockBytes during writing,
+	 * only at completion of a write phase.
+	 */
+	long		numFullBlocks;	/* number of complete blocks in log tape */
+	int			lastBlockBytes; /* valid bytes in last (incomplete) block */
+
+	/*
+	 * Buffer for current data block.  Note we don't bother to store the
+	 * actual file block number of the data block (during the write phase it
+	 * hasn't been assigned yet, and during read we don't care anymore). But
+	 * we do need the relative block number so we can detect end-of-tape while
+	 * reading.
+	 */
+	char	   *buffer;			/* physical buffer (separately palloc'd) */
+	long		curBlockNumber; /* this block's logical blk# within tape */
+	int			pos;			/* next read/write position in buffer */
+	int			nbytes;			/* total # of valid bytes in buffer */
+} LogicalTape;
+
+
 /*
  * prototypes for functions in logtape.c
  */
 
-extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern LogicalTapeSet *LogicalTapeSetCreate(void);
 extern void LogicalTapeSetClose(LogicalTapeSet *lts);
 extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
-extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
-				void *ptr, size_t size);
-extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
-				 void *ptr, size_t size);
-extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
-extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
-extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
-					 size_t size);
-extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
-				long blocknum, int offset);
-extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
-				long *blocknum, int *offset);
 extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
 
+extern LogicalTape *LogicalTapeCreate(LogicalTapeSet *lts, int ntapes);
+extern size_t LogicalTapeRead(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTape *lt, void *ptr, size_t size);
+extern void LogicalTapeRewind(LogicalTape *lt, bool forWrite);
+extern void LogicalTapeFreeze(LogicalTape *lt);
+extern bool LogicalTapeBackspace(LogicalTape *lt, size_t size);
+extern bool LogicalTapeSeek(LogicalTape *lt, long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTape *lt, long *blocknum, int *offset);
+
 #endif   /* LOGTAPE_H */
-- 
2.9.3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to